From d70b23e7433a0b64b8cc99ab6d243f228a0002bc Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 09:20:17 +0800 Subject: [PATCH 01/16] cluster analyze --- .../analysis/analysis_facade.py | 14 +- .../analysis/communication_analysis.py | 120 ++++++++++++++++++ .../analysis/step_trace_time_analysis.py | 90 +++++++++++++ profiler/cluster_analyse/cluster_analysis.py | 19 ++- .../cluster_analyse/common_func/constant.py | 26 +++- .../common_func/file_manager.py | 37 +++--- .../communication_group_generator.py | 90 ++++++++++++- .../cluster_analyse/prof_bean/__init__.py | 14 ++ .../prof_bean/step_trace_time_bean.py | 39 ++++++ 9 files changed, 410 insertions(+), 39 deletions(-) create mode 100644 profiler/cluster_analyse/analysis/communication_analysis.py create mode 100644 profiler/cluster_analyse/analysis/step_trace_time_analysis.py create mode 100644 profiler/cluster_analyse/prof_bean/__init__.py create mode 100644 profiler/cluster_analyse/prof_bean/step_trace_time_bean.py diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index e6966d440..13a154df8 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -15,19 +15,17 @@ from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.file_manager import FileManager +from common_func.constant import Constant class AnalysisFacade: analysis_module = {} - def __init__(self, collection_path: str, data_map: dict, communication_group: dict): - self.collection_path = collection_path - self.data_map = data_map - self.communication_group = communication_group + def __init__(self, param: dict): + self.param = param def cluster_analyze(self): - data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - if not data_map: - print("Can not get rank info or profiling data.") - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() + + pass diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py new file mode 100644 index 000000000..b59a7860b --- /dev/null +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -0,0 +1,120 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager + + +class CommunicationAnalysis: + CLUSTER_COMMUNICATION_JSON = "cluster_commnunication.json" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + self.comm_ops_struct = {} + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def dump_data(self): + if self.comm_ops_struct: + print("There is no final comm ops data generated") + return + FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) + + def split_op_by_group(self): + for single_op in self.communication_ops: + if single_op.get(Constant.COMM_OP_TYPE) == Constant.P2P: + rank_tup = Constant.P2P + else: + rank_tup = tuple(self.collective_group_dict.get(single_op.get(Constant.GROUP_NAME), [])) + rank_id = single_op.get(Constant.RANK_ID, 'N/A') + step_id = single_op.get(Constant.STEP_ID, 'N/A') + op_name = single_op.get(Constant.COMM_OP_NAME, 'N/A') + op_info = single_op.get(Constant.COMM_OP_INFO) + self.comm_ops_struct.setdefault(rank_tup, {}).setdefault(step_id, {}).\ + setdefault(op_name, {}).setdefault(rank_id, op_info) + + def combine_ops_total_info(self): + for rank_tup, group_dict in self.comm_ops_struct.items(): + for step_id, communication_ops in group_dict.items(): + self.compute_total_info(communication_ops) + + def compute_total_info(self, comm_ops: dict): + if not comm_ops: + return + total_rank_dict = {} + for communication_op, rank_dict in comm_ops.items(): + for rank_id, communication_op_info in rank_dict.items(): + total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_TIME_INFO, defaultdict(float)) + total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_BANDWIDTH_INFO, {}) + for com_info, com_info_dict in communication_op_info.items(): + if com_info == self.COMMUNICATION_TIME_INFO: + self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) + if com_info == self.COMMUNICATION_BANDWIDTH_INFO: + self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) + self.compute_time_ratio(total_rank_dict[rank_id][self.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][self.COMMUNICATION_BANDWIDTH_INFO]) + comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict + + def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): + ratio_list = [self.WAIT_TIME_RATIO, self.SYNCHRONIZATION_TIME_RATIO] + for time_info in com_info_dict: + if time_info not in ratio_list and time_info != self.START_TIMESTAMP: + total_time_info_dict[time_info] += com_info_dict.get(time_info) + + def combine_bandwidth_info(self, com_info_dict: dict, total_bandwidth_info_dict: dict): + add_list = [self.TRANSIT_TIME_MS, self.TRANSIT_SIZE_MB] + dict_list = [self.SIZE_DISTRIBUTION] + for transport_type, part_transport_dict in com_info_dict.items(): + if transport_type not in total_bandwidth_info_dict: + total_bandwidth_info_dict[transport_type] = { + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0, + self.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) + } + for bandwidth_msg, value in part_transport_dict.items(): + if bandwidth_msg in add_list: + total_bandwidth_info_dict[transport_type][bandwidth_msg] += value + if bandwidth_msg in dict_list: + self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) + + def compute_time_ratio(self, total_time_info_dict: dict): + if total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: + total_time_info_dict[self.WAIT_TIME_RATIO] = 0 + else: + total_time_info_dict[self.WAIT_TIME_RATIO] = \ + round(total_time_info_dict[self.WAIT_TIME_MS] / + (total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + if total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: + total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = 0 + else: + total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = \ + round(total_time_info_dict[self.WAIT_TIME_MS] / + (total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + + def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): + for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): + if bandwidth_dict[self.TRANSIT_TIME_MS] == 0: + bandwidth_dict[self.BANDWIDTH_GB_S] = 0 + else: + bandwidth_dict[self.BANDWIDTH_GB_S] = \ + round(bandwidth_dict[self.TRANSIT_SIZE_MB] / bandwidth_dict[self.TRANSIT_TIME_MS], 4) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py new file mode 100644 index 000000000..4b280e88a --- /dev/null +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -0,0 +1,90 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager +from prof_bean.step_trace_time_bean import StepTraceTimeBean + + +class StepTraceTimeAnalysis: + CLUSTER_TRACE_TIME_CSV = "cluster_step_trace_time.csv" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.communication_group = param.get(Constant.COMMUNICATION_GROUP) + self.step_time_dict = {} + self.step_data_list = [] + + @staticmethod + def get_max_data_row(data_group_list: list): + if not data_group_list: + return [] + ret = [] + for idx in len(data_group_list[0]): + max_val = 0 + for idy in len(data_group_list): + max_val = max(max_val, data_group_list[idy][idx]) + ret.append(max_val) + return ret + + def run(self): + self.load_step_trace_time_data() + self.analyze_step_time() + self.dump_data() + + def dump_data(self): + if not self.step_data_list: + print("Can't get step time info!") + self.step_data_list = [row[:3].extend(row[4:]) for row in self.step_data_list] + headers = self.get_headers() + FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + + def load_step_trace_time_data(self): + for rank_id, profiling_dir_path in self.data_map: + step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) + if step_time_file: + self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) + if not self.step_time_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid step_trace_time.json.") + + def analyze_step_time(self, ): + for rank_id, data_bean in self.step_time_dict.items(): + self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + stage_list = self.communication_group.get(Constant.P2P) + if not stage_list: + return + step_group_dict = {} + for data_list in self.step_data_list: + stage_group = 'None' + for stage in stage_list: + if data_list[1] in stage: + stage_group = stage + break + key = (data_list[0], stage_group) + step_group_dict.setdefault(key, []).append(data_list[3:]) + + for key, data_group_list in step_group_dict.items(): + self.step_time_dict.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + + def get_headers(self): + if self.step_time_dict: + for rank in self.step_time_dict: + if self.step_time_dict.get(rank): + return self.step_time_dict[rank][0].all_headers + return [] diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index d1daea002..e2e8452e6 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -16,6 +16,8 @@ import argparse from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant +from common_func.file_manager import FileManager class Interface: @@ -25,8 +27,23 @@ class Interface: self.communication_group = {} def run(self): + FileManager.remove_and_make_output_dir(self.param.get(Constant.COLLECTION_PATH)) data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not data_map: + print("Can not get rank info or profiling data.") + return + communication_group, collective_group_dict, communication_ops = \ + CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not collective_group_dict: + print("Can not get communication info from ranks") + return + params = { + Constant.COLLECTION_PATH: self.collection_path, + Constant.DATA_MAP: data_map, + Constant.COLLECTIVE_GROUP: collective_group_dict, + Constant.COMMUNICATION_OPS: communication_ops, + Constant.COMMUNICATION_GROUP: communication_group + } if __name__ == "__main__": diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index e6cc07eb6..7fc82dc83 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -17,12 +17,34 @@ class Constant(object): # dir name FRAMEWORK_DIR = "FRAMEWORK" - OUTPUT_DIR = "ASCEND_PROFILER_OUTPUT" + CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" + SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" - STEP_TIME_CSV == "step_time.csv" + STEP_TIME_CSV = "step_trace_time.csv" # file authority FILE_AUTHORITY = 0o640 DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + + # communication + P2P = "p2p" + COLLECTIVE = "collective" + STEP_ID = "step_id" + RANK_ID = "rank_id" + GROUP_NAME = "group_name" + COMM_OP_TYPE = "comm_op_type" + COMM_OP_NAME = "comm_op_name" + COMM_OP_INFO = "comm_op_info" + TOTAL_OP_INFO = "Total Op Info" + + # params + DATA_MAP = "data_map" + COLLECTIVE_GROUP = "collective_group" + COMMUNICATION_OPS = "communication_ops" + COLLECTION_PATH = 'collection_path' + + # step time + RANK = 'rank' + STAGE = 'stage' diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 6f0c9f99e..8ed02f8e0 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -78,7 +78,7 @@ class FileManager: if file_size <= 0: return {} if file_size > Constant.MAX_JSON_SIZE: - print(f"The file size exceeds the preset value {Constant.MAX_CSV_SIZE / 1024 / 1024}MB, " + print(f"The file size exceeds the preset value {Constant.MAX_JSON_SIZE / 1024 / 1024}MB, " f"please check the file: {file_path}") return {} try: @@ -92,49 +92,44 @@ class FileManager: def create_csv_file(cls, profiler_path: str, data: list, file_name: str, headers: list = None) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(output_path) try: - with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", newline="") as file: writer = csv.writer(file) if headers: writer.writerow(headers) writer.writerows(data) except Exception: - raise RuntimeError(f"Can't create file: {file_path}") + raise RuntimeError(f"Can't create file: {output_file}") @classmethod def create_json_file(cls, profiler_path: str, data: list, file_name: str) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) - cls.create_json_file_by_path(file_path, data) - - @classmethod - def create_json_file_by_path(cls, output_path: str, data: list) -> None: - dir_name = os.path.dirname(output_path) - if not os.path.exists(dir_name): - try: - os.makedirs(dir_name, mode=Constant.DIR_AUTHORITY) - except Exception: - raise RuntimeError(f"Can't create directory: {dir_name}") + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT)) try: - with os.fdopen(os.open(output_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) except Exception: - raise RuntimeError(f"Can't create file: {output_path}") + raise RuntimeError(f"Can't create the file: {output_file}") @classmethod - def remove_and_make_output_dir(cls, profiler_path) -> None: - output_path = os.path.join(profiler_path, Constant.OUTPUT_DIR) + def create_output_dir(cls, collection_path: str) -> None: + output_path = os.path.join(collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if os.path.isdir(output_path): try: + cls.check_file_or_directory_path(output_path) shutil.rmtree(output_path) os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't delete files in the directory: {output_path}") + raise RuntimeError(f"Can't delete the directory: {output_path}") return try: os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't create directory: {output_path}") + raise RuntimeError(f"Can't create the directory: {output_path}") diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index e95175b0b..f637297a8 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -16,31 +16,107 @@ import os from common_func.constant import Constant from common_func.file_manager import FileManager +from collections import defaultdict + class CommunicationGroupGenerator: + COMMUNICATION_GROUP_JSON = "communication_group.json" + def __init__(self, collection_path: str, data_map: dict): self.collection_path = collection_path self.data_map = data_map self.communication_group = {} + self.collective_group_dict = defaultdict(list) + self.p2p_group_dict = defaultdict(list) self.rank_comm_dir_dict = {} + self.communication_ops = [] def generate(self): self.load_communication_json() + self.analyze_communication_ops() + self.generate_collective_communication_group() + self.generate_p2p_communication_group() + FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) + return self.communication_group, self.collective_group_dict, self.communication_ops + + def analyze_communication_ops(self): + for rank_id, rank_id_dict in self.rank_comm_dir_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + if isinstance(step_id_dict, dict): + print(f"rank{rank_id}'s communication.json has a wrong data struct.") + continue + for comm_op_type, comm_op_dict in step_id_dict.items(): + if comm_op_type == Constant.COLLECTIVE: + self.get_collective_ops_name(rank_id, comm_op_dict) + elif comm_op_type == Constant.P2P: + pass + else: + print(f"rank{rank_id}'s communication.json has no p2p or collective.") + continue + self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) def load_communication_json(self): for rank_id, profiling_dir_path in self.data_map: - comm_dir = profiling_dir_path.get(Constant.COMM_JSON) + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_JSON) if comm_dir: self.rank_comm_dir_dict[rank_id] = FileManager.read_json_file(comm_dir) if not self.rank_comm_dir_dict.get(rank_id): - print(f"rank {rank_id} does not have a valid communication.json") - + print(f"rank {rank_id} does not have a valid communication.json.") def generate_collective_communication_group(self): - pass + self.communication_group[Constant.COLLECTIVE] = list(self.collective_group_dict.values()) def generate_p2p_communication_group(self): - pass + stage_group = {} + for rank_list in self.collective_group_dict.values(): + unioned_set = {} + for first_rank, stage in stage_group.items(): + if UnionFind.is_connected(set(rank_list), stage): + unioned_set = UnionFind.union(set(rank_list), stage) + del stage_group[first_rank] + break + if unioned_set: + stage_group[min(unioned_set)] = unioned_set + else: + stage_group[min(rank_list)] = set(rank_list) + first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) + self.communication_group[Constant.P2P] = [stage_group.get(first_rank) for first_rank in first_rank_sort_list] + + def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.collective_group_dict[group_name].append(rank_id) + + def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: comm_op, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: comm_op_dict.get(comm_op) + }) + + +class UnionFind(object): + """Disjoint Set Union""" + @classmethod + def union(cls, p: set, q: set): + """make p and q the same set""" + return p | q - def get_all_collective_ops_name(self): - pass + @classmethod + def is_connected(cls, p: set, q: set): + """ + check whether set p and set q are connected + """ + if p & q: + return True + else: + False diff --git a/profiler/cluster_analyse/prof_bean/__init__.py b/profiler/cluster_analyse/prof_bean/__init__.py new file mode 100644 index 000000000..8400fd5ec --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py new file mode 100644 index 000000000..b9e4de17e --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -0,0 +1,39 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class StepTraceTimeBean: + STEP = "Step" + COMPLEMENT_HEADER = ["Type", "Index"] + + def __init__(self, data: list): + self._data = data + + @property + def row(self) -> list: + row = [] + for field_name in self._data.keys(): + if field_name == self.STEP: + continue + row.append(self._data.get(field_name, )) + return row + + @property + def step(self) -> float: + return float(self._data.get(self.STEP, '')) + + @property + def all_headers(self) -> list: + return list(self._data.keys()[0] + self.COMPLEMENT_HEADER + self._data.keys()[1:]) -- Gitee From 0f9c2491b09529c1895e2be22fdec26cb7039b77 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 09:25:17 +0800 Subject: [PATCH 02/16] cluster analyze --- .../cluster_data_preprocess/pytorch_data_preprocessor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 7e0487a2c..44e373731 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -16,6 +16,7 @@ from collections import defaultdict import os + class PytorchDataPreprocessor: PROFILER_INFO_HEAD = 'profiler_info_' PROFILER_INFO_EXTENSION = '.json' -- Gitee From 354972b6780ed7331dd888a276b4d89caf76effd Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 11:35:46 +0800 Subject: [PATCH 03/16] cluster analyze --- profiler/cluster_analyse/cluster_analysis.py | 2 +- profiler/cluster_analyse/common_func/file_manager.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index e2e8452e6..3ffa8c610 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -27,7 +27,7 @@ class Interface: self.communication_group = {} def run(self): - FileManager.remove_and_make_output_dir(self.param.get(Constant.COLLECTION_PATH)) + FileManager.create_output_dir(self.collection_path) data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() if not data_map: print("Can not get rank info or profiling data.") diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 8ed02f8e0..b380be37b 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -94,7 +94,7 @@ class FileManager: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(output_path) + cls.check_file_or_directory_path(output_path, isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", newline="") as file: @@ -111,7 +111,7 @@ class FileManager: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT)) + cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT), isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) @@ -123,7 +123,7 @@ class FileManager: output_path = os.path.join(collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if os.path.isdir(output_path): try: - cls.check_file_or_directory_path(output_path) + cls.check_file_or_directory_path(output_path, isdir=True) shutil.rmtree(output_path) os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: -- Gitee From b18eb511636bc856455a8a11ef7a236f3009ada1 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 15:35:16 +0800 Subject: [PATCH 04/16] cluster analyze --- .../cluster_analyse/common_func/constant.py | 3 ++- .../common_func/file_manager.py | 4 ++-- .../communication_group_generator.py | 21 ++++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 7fc82dc83..5532bfb9c 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -43,7 +43,8 @@ class Constant(object): DATA_MAP = "data_map" COLLECTIVE_GROUP = "collective_group" COMMUNICATION_OPS = "communication_ops" - COLLECTION_PATH = 'collection_path' + COLLECTION_PATH = "collection_path" + COMMUNICATION_GROUP = "communication_group" # step time RANK = 'rank' diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index b380be37b..5ee9197bf 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -106,12 +106,12 @@ class FileManager: raise RuntimeError(f"Can't create file: {output_file}") @classmethod - def create_json_file(cls, profiler_path: str, data: list, file_name: str) -> None: + def create_json_file(cls, profiler_path: str, data: dict, file_name: str) -> None: if not data: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT), isdir=True) + cls.check_file_or_directory_path(output_path, isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index f637297a8..bb156b2c8 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -26,7 +26,7 @@ class CommunicationGroupGenerator: self.collection_path = collection_path self.data_map = data_map self.communication_group = {} - self.collective_group_dict = defaultdict(list) + self.collective_group_dict = defaultdict(set) self.p2p_group_dict = defaultdict(list) self.rank_comm_dir_dict = {} self.communication_ops = [] @@ -42,7 +42,7 @@ class CommunicationGroupGenerator: def analyze_communication_ops(self): for rank_id, rank_id_dict in self.rank_comm_dir_dict.items(): for step_id, step_id_dict in rank_id_dict.items(): - if isinstance(step_id_dict, dict): + if not isinstance(step_id_dict, dict): print(f"rank{rank_id}'s communication.json has a wrong data struct.") continue for comm_op_type, comm_op_dict in step_id_dict.items(): @@ -56,7 +56,7 @@ class CommunicationGroupGenerator: self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) def load_communication_json(self): - for rank_id, profiling_dir_path in self.data_map: + for rank_id, profiling_dir_path in self.data_map.items(): comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_JSON) if comm_dir: self.rank_comm_dir_dict[rank_id] = FileManager.read_json_file(comm_dir) @@ -64,30 +64,31 @@ class CommunicationGroupGenerator: print(f"rank {rank_id} does not have a valid communication.json.") def generate_collective_communication_group(self): - self.communication_group[Constant.COLLECTIVE] = list(self.collective_group_dict.values()) + self.communication_group[Constant.COLLECTIVE] = [list(group) for group in self.collective_group_dict.values()] def generate_p2p_communication_group(self): stage_group = {} - for rank_list in self.collective_group_dict.values(): + for rank_set in self.collective_group_dict.values(): unioned_set = {} for first_rank, stage in stage_group.items(): - if UnionFind.is_connected(set(rank_list), stage): - unioned_set = UnionFind.union(set(rank_list), stage) + if UnionFind.is_connectedt(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage) del stage_group[first_rank] break if unioned_set: stage_group[min(unioned_set)] = unioned_set else: - stage_group[min(rank_list)] = set(rank_list) + stage_group[min(rank_set)] = rank_set first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) - self.communication_group[Constant.P2P] = [stage_group.get(first_rank) for first_rank in first_rank_sort_list] + self.communication_group[Constant.P2P] = \ + [list(stage_group.get(first_rank, {})) for first_rank in first_rank_sort_list] def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): for comm_op in comm_op_dict: if comm_op.startswith('Total'): continue group_name = comm_op.split('@')[-1] - self.collective_group_dict[group_name].append(rank_id) + self.collective_group_dict[group_name].add(rank_id) def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): for comm_op in comm_op_dict: -- Gitee From 44fab58466dc9b876b99e6d6735d0f8bd3c27556 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:26:37 +0800 Subject: [PATCH 05/16] cluster analyze --- .../cluster_analyse/analysis/analysis_facade.py | 13 +++++-------- profiler/cluster_analyse/cluster_analysis.py | 2 ++ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index 13a154df8..a09a065a5 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -13,19 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor -from communication_group.communication_group_generator import CommunicationGroupGenerator -from common_func.file_manager import FileManager -from common_func.constant import Constant +from analysis.communication_analysis import CommunicationAnalysis +from analysis.step_trace_time_analysis import StepTraceTimeAnalysis class AnalysisFacade: - analysis_module = {} + analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis} def __init__(self, param: dict): self.param = param def cluster_analyze(self): - - pass - + for analysis in self.analysis_module: + analysis.run() diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index 3ffa8c610..861bdeac6 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -18,6 +18,7 @@ from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreproc from communication_group.communication_group_generator import CommunicationGroupGenerator from common_func.constant import Constant from common_func.file_manager import FileManager +from analysis.analysis_facade import AnalysisFacade class Interface: @@ -44,6 +45,7 @@ class Interface: Constant.COMMUNICATION_OPS: communication_ops, Constant.COMMUNICATION_GROUP: communication_group } + AnalysisFacade(params).cluster_analyze() if __name__ == "__main__": -- Gitee From 2623ff35575e5e3e3d4d7c27b957e2382dc684ec Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:29:55 +0800 Subject: [PATCH 06/16] cluster analyze --- .../cluster_analyse/analysis/analysis_facade.py | 2 +- profiler/cluster_analyse/common_func/constant.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index a09a065a5..ff352c94e 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -25,4 +25,4 @@ class AnalysisFacade: def cluster_analyze(self): for analysis in self.analysis_module: - analysis.run() + analysis(self.param).run() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 5532bfb9c..aa3f6b7a9 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -38,6 +38,20 @@ class Constant(object): COMM_OP_NAME = "comm_op_name" COMM_OP_INFO = "comm_op_info" TOTAL_OP_INFO = "Total Op Info" + COMMUNICATION_TIME_INFO = "Communication Time Info" + START_TIMESTAMP = "Start Timestamp(us)" + COMMUNICATION_BANDWIDTH_INFO = "Communication Bandwidth Info" + HCOM_SEND = "hcom_send" + HCOM_RECEIVE = "hcom_receive" + SYNCHRONIZATION_TIME_RATIO = "Synchronization Time Ratio" + SYNCHRONIZATION_TIME_MS = "Synchronization Time(ms)" + WAIT_TIME_RATIO = "Wait Time Ratio" + TRANSIT_TIME_MS = "Transit Time(ms)" + TRANSIT_SIZE_MB = "Transit Size(MB)" + SIZE_DISTRIBUTION = "Size Distribution" + WAIT_TIME_MS = "Wait Time(ms)" + BANDWIDTH_GB_S = "Bandwidth(GB/s)" + COMMUNICATION = "communication.json" # params DATA_MAP = "data_map" -- Gitee From 52d671a4dd7c1192de3ce5ca707a6e7bc817dd14 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:32:11 +0800 Subject: [PATCH 07/16] cluster analyze --- .../analysis/communication_analysis.py | 56 +++++++++---------- .../analysis/step_trace_time_analysis.py | 2 +- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index b59a7860b..040ebbff5 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -63,32 +63,32 @@ class CommunicationAnalysis: total_rank_dict = {} for communication_op, rank_dict in comm_ops.items(): for rank_id, communication_op_info in rank_dict.items(): - total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_TIME_INFO, defaultdict(float)) - total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_BANDWIDTH_INFO, {}) + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) for com_info, com_info_dict in communication_op_info.items(): - if com_info == self.COMMUNICATION_TIME_INFO: + if com_info == Constant.COMMUNICATION_TIME_INFO: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) - if com_info == self.COMMUNICATION_BANDWIDTH_INFO: + if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) - self.compute_time_ratio(total_rank_dict[rank_id][self.COMMUNICATION_TIME_INFO]) - self.compute_bandwidth_ratio(total_rank_dict[rank_id][self.COMMUNICATION_BANDWIDTH_INFO]) + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): - ratio_list = [self.WAIT_TIME_RATIO, self.SYNCHRONIZATION_TIME_RATIO] + ratio_list = [Constant.WAIT_TIME_RATIO, Constant.SYNCHRONIZATION_TIME_RATIO] for time_info in com_info_dict: - if time_info not in ratio_list and time_info != self.START_TIMESTAMP: + if time_info not in ratio_list and time_info != Constant.START_TIMESTAMP: total_time_info_dict[time_info] += com_info_dict.get(time_info) def combine_bandwidth_info(self, com_info_dict: dict, total_bandwidth_info_dict: dict): - add_list = [self.TRANSIT_TIME_MS, self.TRANSIT_SIZE_MB] - dict_list = [self.SIZE_DISTRIBUTION] + add_list = [Constant.TRANSIT_TIME_MS, Constant.TRANSIT_SIZE_MB] + dict_list = [Constant.SIZE_DISTRIBUTION] for transport_type, part_transport_dict in com_info_dict.items(): if transport_type not in total_bandwidth_info_dict: total_bandwidth_info_dict[transport_type] = { - self.TRANSIT_TIME_MS: 0, - self.TRANSIT_SIZE_MB: 0, - self.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) + Constant.TRANSIT_TIME_MS: 0, + Constant.TRANSIT_SIZE_MB: 0, + Constant.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) } for bandwidth_msg, value in part_transport_dict.items(): if bandwidth_msg in add_list: @@ -97,24 +97,24 @@ class CommunicationAnalysis: self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) def compute_time_ratio(self, total_time_info_dict: dict): - if total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: - total_time_info_dict[self.WAIT_TIME_RATIO] = 0 + if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 else: - total_time_info_dict[self.WAIT_TIME_RATIO] = \ - round(total_time_info_dict[self.WAIT_TIME_MS] / - (total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) - if total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: - total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = 0 + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 else: - total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = \ - round(total_time_info_dict[self.WAIT_TIME_MS] / - (total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + - total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): - if bandwidth_dict[self.TRANSIT_TIME_MS] == 0: - bandwidth_dict[self.BANDWIDTH_GB_S] = 0 + if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: + bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 else: - bandwidth_dict[self.BANDWIDTH_GB_S] = \ - round(bandwidth_dict[self.TRANSIT_SIZE_MB] / bandwidth_dict[self.TRANSIT_TIME_MS], 4) + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 4b280e88a..4e9d479c4 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -56,7 +56,7 @@ class StepTraceTimeAnalysis: FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) def load_step_trace_time_data(self): - for rank_id, profiling_dir_path in self.data_map: + for rank_id, profiling_dir_path in self.data_map.items(): step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) if step_time_file: self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) -- Gitee From 2faa2e618b29798876a006124e9da8da31778981 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:36:01 +0800 Subject: [PATCH 08/16] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 040ebbff5..889384e9d 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -28,6 +28,12 @@ class CommunicationAnalysis: self.communication_ops = param.get(Constant.COMMUNICATION_OPS) self.comm_ops_struct = {} + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + def run(self): self.split_op_by_group() self.combine_ops_total_info() -- Gitee From 49163a834649c806d11a06579ac3eeb33251c461 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:38:09 +0800 Subject: [PATCH 09/16] cluster analyze --- .../cluster_analyse/analysis/step_trace_time_analysis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 4e9d479c4..8eae2b559 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -64,8 +64,9 @@ class StepTraceTimeAnalysis: print(f"rank {rank_id} does not have a valid step_trace_time.json.") def analyze_step_time(self, ): - for rank_id, data_bean in self.step_time_dict.items(): - self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + for rank_id, data_bean_list in self.step_time_dict.items(): + for data_bean in data_bean_list: + self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) stage_list = self.communication_group.get(Constant.P2P) if not stage_list: return -- Gitee From 9e7f9c684eff5d28ce8b67bd1f56d9ef2767710b Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:41:36 +0800 Subject: [PATCH 10/16] cluster analyze --- profiler/cluster_analyse/analysis/step_trace_time_analysis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 8eae2b559..289de153d 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -66,7 +66,7 @@ class StepTraceTimeAnalysis: def analyze_step_time(self, ): for rank_id, data_bean_list in self.step_time_dict.items(): for data_bean in data_bean_list: - self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) stage_list = self.communication_group.get(Constant.P2P) if not stage_list: return @@ -81,7 +81,7 @@ class StepTraceTimeAnalysis: step_group_dict.setdefault(key, []).append(data_list[3:]) for key, data_group_list in step_group_dict.items(): - self.step_time_dict.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + self.step_time_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) def get_headers(self): if self.step_time_dict: -- Gitee From cc1931326ae1f708d8892d79114c39d3b309e32c Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:46:08 +0800 Subject: [PATCH 11/16] cluster analyze --- .../cluster_analyse/analysis/step_trace_time_analysis.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 289de153d..30e884eab 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -36,9 +36,9 @@ class StepTraceTimeAnalysis: if not data_group_list: return [] ret = [] - for idx in len(data_group_list[0]): + for idx in range(len(data_group_list[0])): max_val = 0 - for idy in len(data_group_list): + for idy in range(len(data_group_list)): max_val = max(max_val, data_group_list[idy][idx]) ret.append(max_val) return ret @@ -81,7 +81,7 @@ class StepTraceTimeAnalysis: step_group_dict.setdefault(key, []).append(data_list[3:]) for key, data_group_list in step_group_dict.items(): - self.step_time_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + self.step_data_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) def get_headers(self): if self.step_time_dict: -- Gitee From 7cc803c5f7e3d1da08754237facbc47e0a1c866d Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:50:40 +0800 Subject: [PATCH 12/16] cluster analyze --- profiler/cluster_analyse/prof_bean/step_trace_time_bean.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py index b9e4de17e..80f09d561 100644 --- a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py +++ b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -16,7 +16,7 @@ class StepTraceTimeBean: STEP = "Step" - COMPLEMENT_HEADER = ["Type", "Index"] + COMPLEMENT_HEADER = ["Step", "Type", "Index"] def __init__(self, data: list): self._data = data @@ -27,7 +27,7 @@ class StepTraceTimeBean: for field_name in self._data.keys(): if field_name == self.STEP: continue - row.append(self._data.get(field_name, )) + row.append(float(self._data.get(field_name, ))) return row @property @@ -36,4 +36,4 @@ class StepTraceTimeBean: @property def all_headers(self) -> list: - return list(self._data.keys()[0] + self.COMPLEMENT_HEADER + self._data.keys()[1:]) + return self.COMPLEMENT_HEADER + list(self._data.keys())[1:] -- Gitee From 90e2137d09c0688cc00903ed69f1afd1a7660f0d Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:53:14 +0800 Subject: [PATCH 13/16] cluster analyze --- profiler/cluster_analyse/analysis/step_trace_time_analysis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 30e884eab..d03991b50 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -51,7 +51,6 @@ class StepTraceTimeAnalysis: def dump_data(self): if not self.step_data_list: print("Can't get step time info!") - self.step_data_list = [row[:3].extend(row[4:]) for row in self.step_data_list] headers = self.get_headers() FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) -- Gitee From 660e205c4d7621ac5f40951051dfe98d79aa5561 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 17:39:01 +0800 Subject: [PATCH 14/16] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 2 +- .../cluster_analyse/analysis/step_trace_time_analysis.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 889384e9d..9e0e1b60d 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -40,7 +40,7 @@ class CommunicationAnalysis: self.dump_data() def dump_data(self): - if self.comm_ops_struct: + if not self.comm_ops_struct: print("There is no final comm ops data generated") return FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index d03991b50..5f0497811 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -62,7 +62,7 @@ class StepTraceTimeAnalysis: if not self.step_time_dict.get(rank_id): print(f"rank {rank_id} does not have a valid step_trace_time.json.") - def analyze_step_time(self, ): + def analyze_step_time(self): for rank_id, data_bean_list in self.step_time_dict.items(): for data_bean in data_bean_list: self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) @@ -73,8 +73,8 @@ class StepTraceTimeAnalysis: for data_list in self.step_data_list: stage_group = 'None' for stage in stage_list: - if data_list[1] in stage: - stage_group = stage + if data_list[2] in stage: + stage_group = tuple(stage) break key = (data_list[0], stage_group) step_group_dict.setdefault(key, []).append(data_list[3:]) -- Gitee From 4b9c00ff668b185362cc2d5c84f3f4f5a8a9d8aa Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 17:58:06 +0800 Subject: [PATCH 15/16] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 9e0e1b60d..10e56b24c 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -43,7 +43,10 @@ class CommunicationAnalysis: if not self.comm_ops_struct: print("There is no final comm ops data generated") return - FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) + output_comm_data = {} + for key in self.comm_ops_struct: + output_comm_data[str(key)] = self.comm_ops_struct.get(key) + FileManager.create_json_file(self.collection_path, output_comm_data, self.CLUSTER_COMMUNICATION_JSON) def split_op_by_group(self): for single_op in self.communication_ops: -- Gitee From d29b06ca415000445f661626e039fe9813a0d7e6 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 20:51:34 +0800 Subject: [PATCH 16/16] cluster analyze --- .../analysis/communication_analysis.py | 7 ++++--- .../pytorch_data_preprocessor.py | 7 +++---- profiler/cluster_analyse/common_func/constant.py | 1 + .../cluster_analyse/common_func/file_manager.py | 16 ++++++++++++---- .../communication_group_generator.py | 14 ++++++++------ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 10e56b24c..19f2bf7d5 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -19,7 +19,7 @@ from common_func.file_manager import FileManager class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_commnunication.json" + CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) @@ -79,8 +79,9 @@ class CommunicationAnalysis: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) - self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) - self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) + for rank_id in total_rank_dict: + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 44e373731..2870048a5 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -14,6 +14,7 @@ # limitations under the License. from collections import defaultdict +from common_func.file_manager import FileManager import os @@ -25,9 +26,7 @@ class PytorchDataPreprocessor: self.path = os.path.realpath(path) def get_data_map(self) -> dict: - if not os.path.exists(self.path) or not os.access(self.path, os.R_OK): - print('[Error]path:{} not exist or not accessable.'.format(self.path)) - return dict() + FileManager.check_file_or_directory_path(self.path, isdir=True) collector_dirs = [dir_name for dir_name in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dir_name))] ascend_pt_dirs = [dir_name for dir_name in collector_dirs if dir_name.endswith("ascend_pt")] @@ -37,7 +36,7 @@ class PytorchDataPreprocessor: rank_id = self.get_rank_id(dir_name) if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') - + continue rank_id_map[rank_id].append(dir_name) ret_dict = dict() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index aa3f6b7a9..84d4a9e05 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -27,6 +27,7 @@ class Constant(object): DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + MAX_PATH_LENGTH = 4096 # communication P2P = "p2p" diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 5ee9197bf..3ac6b843a 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -33,6 +33,18 @@ class FileManager: Exception Description: when invalid data throw exception """ + if not os.access(path, os.R_OK): + raise RuntimeError( + 'The path {} does not have permission to read. Please check the path permission'.format(path)) + + if len(path) > Constant.MAX_PATH_LENGTH: + msg = f"The length of file path exceeded the maximum value {Constant.MAX_PATH_LENGTH}: {path}" + raise RuntimeError(msg) + + if os.path.islink(path): + msg = f"Invalid profiling path is soft link: {path}" + raise RuntimeError(msg) + if isdir: if not os.path.exists(path): raise RuntimeError('The path {} is not exist.'.format(path)) @@ -47,10 +59,6 @@ class FileManager: if not os.path.isfile(path): raise RuntimeError('{} is an invalid file or non-exist.'.format(path)) - if not os.access(path, os.R_OK): - raise RuntimeError( - 'The path {} does not have permission to read. Please check the path permission'.format(path)) - @classmethod def read_csv_file(cls, file_path: str, class_bean: any) -> list: cls.check_file_or_directory_path(file_path) diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index bb156b2c8..0202ea0e5 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -70,12 +70,14 @@ class CommunicationGroupGenerator: stage_group = {} for rank_set in self.collective_group_dict.values(): unioned_set = {} + remove_key = [] for first_rank, stage in stage_group.items(): - if UnionFind.is_connectedt(rank_set, stage): - unioned_set = UnionFind.union(rank_set, stage) - del stage_group[first_rank] - break + if UnionFind.is_connected(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage, unioned_set) + remove_key.append(first_rank) if unioned_set: + for key in remove_key: + del stage_group[key] stage_group[min(unioned_set)] = unioned_set else: stage_group[min(rank_set)] = rank_set @@ -108,9 +110,9 @@ class CommunicationGroupGenerator: class UnionFind(object): """Disjoint Set Union""" @classmethod - def union(cls, p: set, q: set): + def union(cls, p: set, q: set, o: set): """make p and q the same set""" - return p | q + return p | q | o @classmethod def is_connected(cls, p: set, q: set): -- Gitee