From d70b23e7433a0b64b8cc99ab6d243f228a0002bc Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 09:20:17 +0800 Subject: [PATCH 01/24] cluster analyze --- .../analysis/analysis_facade.py | 14 +- .../analysis/communication_analysis.py | 120 ++++++++++++++++++ .../analysis/step_trace_time_analysis.py | 90 +++++++++++++ profiler/cluster_analyse/cluster_analysis.py | 19 ++- .../cluster_analyse/common_func/constant.py | 26 +++- .../common_func/file_manager.py | 37 +++--- .../communication_group_generator.py | 90 ++++++++++++- .../cluster_analyse/prof_bean/__init__.py | 14 ++ .../prof_bean/step_trace_time_bean.py | 39 ++++++ 9 files changed, 410 insertions(+), 39 deletions(-) create mode 100644 profiler/cluster_analyse/analysis/communication_analysis.py create mode 100644 profiler/cluster_analyse/analysis/step_trace_time_analysis.py create mode 100644 profiler/cluster_analyse/prof_bean/__init__.py create mode 100644 profiler/cluster_analyse/prof_bean/step_trace_time_bean.py diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index e6966d4402..13a154df83 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -15,19 +15,17 @@ from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.file_manager import FileManager +from common_func.constant import Constant class AnalysisFacade: analysis_module = {} - def __init__(self, collection_path: str, data_map: dict, communication_group: dict): - self.collection_path = collection_path - self.data_map = data_map - self.communication_group = communication_group + def __init__(self, param: dict): + self.param = param def cluster_analyze(self): - data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - if not data_map: - print("Can not get rank info or profiling data.") - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() + + pass diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py new file mode 100644 index 0000000000..b59a7860b2 --- /dev/null +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -0,0 +1,120 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager + + +class CommunicationAnalysis: + CLUSTER_COMMUNICATION_JSON = "cluster_commnunication.json" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + self.comm_ops_struct = {} + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def dump_data(self): + if self.comm_ops_struct: + print("There is no final comm ops data generated") + return + FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) + + def split_op_by_group(self): + for single_op in self.communication_ops: + if single_op.get(Constant.COMM_OP_TYPE) == Constant.P2P: + rank_tup = Constant.P2P + else: + rank_tup = tuple(self.collective_group_dict.get(single_op.get(Constant.GROUP_NAME), [])) + rank_id = single_op.get(Constant.RANK_ID, 'N/A') + step_id = single_op.get(Constant.STEP_ID, 'N/A') + op_name = single_op.get(Constant.COMM_OP_NAME, 'N/A') + op_info = single_op.get(Constant.COMM_OP_INFO) + self.comm_ops_struct.setdefault(rank_tup, {}).setdefault(step_id, {}).\ + setdefault(op_name, {}).setdefault(rank_id, op_info) + + def combine_ops_total_info(self): + for rank_tup, group_dict in self.comm_ops_struct.items(): + for step_id, communication_ops in group_dict.items(): + self.compute_total_info(communication_ops) + + def compute_total_info(self, comm_ops: dict): + if not comm_ops: + return + total_rank_dict = {} + for communication_op, rank_dict in comm_ops.items(): + for rank_id, communication_op_info in rank_dict.items(): + total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_TIME_INFO, defaultdict(float)) + total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_BANDWIDTH_INFO, {}) + for com_info, com_info_dict in communication_op_info.items(): + if com_info == self.COMMUNICATION_TIME_INFO: + self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) + if com_info == self.COMMUNICATION_BANDWIDTH_INFO: + self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) + self.compute_time_ratio(total_rank_dict[rank_id][self.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][self.COMMUNICATION_BANDWIDTH_INFO]) + comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict + + def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): + ratio_list = [self.WAIT_TIME_RATIO, self.SYNCHRONIZATION_TIME_RATIO] + for time_info in com_info_dict: + if time_info not in ratio_list and time_info != self.START_TIMESTAMP: + total_time_info_dict[time_info] += com_info_dict.get(time_info) + + def combine_bandwidth_info(self, com_info_dict: dict, total_bandwidth_info_dict: dict): + add_list = [self.TRANSIT_TIME_MS, self.TRANSIT_SIZE_MB] + dict_list = [self.SIZE_DISTRIBUTION] + for transport_type, part_transport_dict in com_info_dict.items(): + if transport_type not in total_bandwidth_info_dict: + total_bandwidth_info_dict[transport_type] = { + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0, + self.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) + } + for bandwidth_msg, value in part_transport_dict.items(): + if bandwidth_msg in add_list: + total_bandwidth_info_dict[transport_type][bandwidth_msg] += value + if bandwidth_msg in dict_list: + self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) + + def compute_time_ratio(self, total_time_info_dict: dict): + if total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: + total_time_info_dict[self.WAIT_TIME_RATIO] = 0 + else: + total_time_info_dict[self.WAIT_TIME_RATIO] = \ + round(total_time_info_dict[self.WAIT_TIME_MS] / + (total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + if total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: + total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = 0 + else: + total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = \ + round(total_time_info_dict[self.WAIT_TIME_MS] / + (total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + + def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): + for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): + if bandwidth_dict[self.TRANSIT_TIME_MS] == 0: + bandwidth_dict[self.BANDWIDTH_GB_S] = 0 + else: + bandwidth_dict[self.BANDWIDTH_GB_S] = \ + round(bandwidth_dict[self.TRANSIT_SIZE_MB] / bandwidth_dict[self.TRANSIT_TIME_MS], 4) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py new file mode 100644 index 0000000000..4b280e88af --- /dev/null +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -0,0 +1,90 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager +from prof_bean.step_trace_time_bean import StepTraceTimeBean + + +class StepTraceTimeAnalysis: + CLUSTER_TRACE_TIME_CSV = "cluster_step_trace_time.csv" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.communication_group = param.get(Constant.COMMUNICATION_GROUP) + self.step_time_dict = {} + self.step_data_list = [] + + @staticmethod + def get_max_data_row(data_group_list: list): + if not data_group_list: + return [] + ret = [] + for idx in len(data_group_list[0]): + max_val = 0 + for idy in len(data_group_list): + max_val = max(max_val, data_group_list[idy][idx]) + ret.append(max_val) + return ret + + def run(self): + self.load_step_trace_time_data() + self.analyze_step_time() + self.dump_data() + + def dump_data(self): + if not self.step_data_list: + print("Can't get step time info!") + self.step_data_list = [row[:3].extend(row[4:]) for row in self.step_data_list] + headers = self.get_headers() + FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + + def load_step_trace_time_data(self): + for rank_id, profiling_dir_path in self.data_map: + step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) + if step_time_file: + self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) + if not self.step_time_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid step_trace_time.json.") + + def analyze_step_time(self, ): + for rank_id, data_bean in self.step_time_dict.items(): + self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + stage_list = self.communication_group.get(Constant.P2P) + if not stage_list: + return + step_group_dict = {} + for data_list in self.step_data_list: + stage_group = 'None' + for stage in stage_list: + if data_list[1] in stage: + stage_group = stage + break + key = (data_list[0], stage_group) + step_group_dict.setdefault(key, []).append(data_list[3:]) + + for key, data_group_list in step_group_dict.items(): + self.step_time_dict.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + + def get_headers(self): + if self.step_time_dict: + for rank in self.step_time_dict: + if self.step_time_dict.get(rank): + return self.step_time_dict[rank][0].all_headers + return [] diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index d1daea002b..e2e8452e65 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -16,6 +16,8 @@ import argparse from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant +from common_func.file_manager import FileManager class Interface: @@ -25,8 +27,23 @@ class Interface: self.communication_group = {} def run(self): + FileManager.remove_and_make_output_dir(self.param.get(Constant.COLLECTION_PATH)) data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not data_map: + print("Can not get rank info or profiling data.") + return + communication_group, collective_group_dict, communication_ops = \ + CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not collective_group_dict: + print("Can not get communication info from ranks") + return + params = { + Constant.COLLECTION_PATH: self.collection_path, + Constant.DATA_MAP: data_map, + Constant.COLLECTIVE_GROUP: collective_group_dict, + Constant.COMMUNICATION_OPS: communication_ops, + Constant.COMMUNICATION_GROUP: communication_group + } if __name__ == "__main__": diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index e6cc07eb6e..7fc82dc834 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -17,12 +17,34 @@ class Constant(object): # dir name FRAMEWORK_DIR = "FRAMEWORK" - OUTPUT_DIR = "ASCEND_PROFILER_OUTPUT" + CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" + SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" - STEP_TIME_CSV == "step_time.csv" + STEP_TIME_CSV = "step_trace_time.csv" # file authority FILE_AUTHORITY = 0o640 DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + + # communication + P2P = "p2p" + COLLECTIVE = "collective" + STEP_ID = "step_id" + RANK_ID = "rank_id" + GROUP_NAME = "group_name" + COMM_OP_TYPE = "comm_op_type" + COMM_OP_NAME = "comm_op_name" + COMM_OP_INFO = "comm_op_info" + TOTAL_OP_INFO = "Total Op Info" + + # params + DATA_MAP = "data_map" + COLLECTIVE_GROUP = "collective_group" + COMMUNICATION_OPS = "communication_ops" + COLLECTION_PATH = 'collection_path' + + # step time + RANK = 'rank' + STAGE = 'stage' diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 6f0c9f99e9..8ed02f8e0e 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -78,7 +78,7 @@ class FileManager: if file_size <= 0: return {} if file_size > Constant.MAX_JSON_SIZE: - print(f"The file size exceeds the preset value {Constant.MAX_CSV_SIZE / 1024 / 1024}MB, " + print(f"The file size exceeds the preset value {Constant.MAX_JSON_SIZE / 1024 / 1024}MB, " f"please check the file: {file_path}") return {} try: @@ -92,49 +92,44 @@ class FileManager: def create_csv_file(cls, profiler_path: str, data: list, file_name: str, headers: list = None) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(output_path) try: - with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", newline="") as file: writer = csv.writer(file) if headers: writer.writerow(headers) writer.writerows(data) except Exception: - raise RuntimeError(f"Can't create file: {file_path}") + raise RuntimeError(f"Can't create file: {output_file}") @classmethod def create_json_file(cls, profiler_path: str, data: list, file_name: str) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) - cls.create_json_file_by_path(file_path, data) - - @classmethod - def create_json_file_by_path(cls, output_path: str, data: list) -> None: - dir_name = os.path.dirname(output_path) - if not os.path.exists(dir_name): - try: - os.makedirs(dir_name, mode=Constant.DIR_AUTHORITY) - except Exception: - raise RuntimeError(f"Can't create directory: {dir_name}") + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT)) try: - with os.fdopen(os.open(output_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) except Exception: - raise RuntimeError(f"Can't create file: {output_path}") + raise RuntimeError(f"Can't create the file: {output_file}") @classmethod - def remove_and_make_output_dir(cls, profiler_path) -> None: - output_path = os.path.join(profiler_path, Constant.OUTPUT_DIR) + def create_output_dir(cls, collection_path: str) -> None: + output_path = os.path.join(collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if os.path.isdir(output_path): try: + cls.check_file_or_directory_path(output_path) shutil.rmtree(output_path) os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't delete files in the directory: {output_path}") + raise RuntimeError(f"Can't delete the directory: {output_path}") return try: os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't create directory: {output_path}") + raise RuntimeError(f"Can't create the directory: {output_path}") diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index e95175b0b3..f637297a8b 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -16,31 +16,107 @@ import os from common_func.constant import Constant from common_func.file_manager import FileManager +from collections import defaultdict + class CommunicationGroupGenerator: + COMMUNICATION_GROUP_JSON = "communication_group.json" + def __init__(self, collection_path: str, data_map: dict): self.collection_path = collection_path self.data_map = data_map self.communication_group = {} + self.collective_group_dict = defaultdict(list) + self.p2p_group_dict = defaultdict(list) self.rank_comm_dir_dict = {} + self.communication_ops = [] def generate(self): self.load_communication_json() + self.analyze_communication_ops() + self.generate_collective_communication_group() + self.generate_p2p_communication_group() + FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) + return self.communication_group, self.collective_group_dict, self.communication_ops + + def analyze_communication_ops(self): + for rank_id, rank_id_dict in self.rank_comm_dir_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + if isinstance(step_id_dict, dict): + print(f"rank{rank_id}'s communication.json has a wrong data struct.") + continue + for comm_op_type, comm_op_dict in step_id_dict.items(): + if comm_op_type == Constant.COLLECTIVE: + self.get_collective_ops_name(rank_id, comm_op_dict) + elif comm_op_type == Constant.P2P: + pass + else: + print(f"rank{rank_id}'s communication.json has no p2p or collective.") + continue + self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) def load_communication_json(self): for rank_id, profiling_dir_path in self.data_map: - comm_dir = profiling_dir_path.get(Constant.COMM_JSON) + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_JSON) if comm_dir: self.rank_comm_dir_dict[rank_id] = FileManager.read_json_file(comm_dir) if not self.rank_comm_dir_dict.get(rank_id): - print(f"rank {rank_id} does not have a valid communication.json") - + print(f"rank {rank_id} does not have a valid communication.json.") def generate_collective_communication_group(self): - pass + self.communication_group[Constant.COLLECTIVE] = list(self.collective_group_dict.values()) def generate_p2p_communication_group(self): - pass + stage_group = {} + for rank_list in self.collective_group_dict.values(): + unioned_set = {} + for first_rank, stage in stage_group.items(): + if UnionFind.is_connected(set(rank_list), stage): + unioned_set = UnionFind.union(set(rank_list), stage) + del stage_group[first_rank] + break + if unioned_set: + stage_group[min(unioned_set)] = unioned_set + else: + stage_group[min(rank_list)] = set(rank_list) + first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) + self.communication_group[Constant.P2P] = [stage_group.get(first_rank) for first_rank in first_rank_sort_list] + + def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.collective_group_dict[group_name].append(rank_id) + + def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: comm_op, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: comm_op_dict.get(comm_op) + }) + + +class UnionFind(object): + """Disjoint Set Union""" + @classmethod + def union(cls, p: set, q: set): + """make p and q the same set""" + return p | q - def get_all_collective_ops_name(self): - pass + @classmethod + def is_connected(cls, p: set, q: set): + """ + check whether set p and set q are connected + """ + if p & q: + return True + else: + False diff --git a/profiler/cluster_analyse/prof_bean/__init__.py b/profiler/cluster_analyse/prof_bean/__init__.py new file mode 100644 index 0000000000..8400fd5ecd --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py new file mode 100644 index 0000000000..b9e4de17e0 --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -0,0 +1,39 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class StepTraceTimeBean: + STEP = "Step" + COMPLEMENT_HEADER = ["Type", "Index"] + + def __init__(self, data: list): + self._data = data + + @property + def row(self) -> list: + row = [] + for field_name in self._data.keys(): + if field_name == self.STEP: + continue + row.append(self._data.get(field_name, )) + return row + + @property + def step(self) -> float: + return float(self._data.get(self.STEP, '')) + + @property + def all_headers(self) -> list: + return list(self._data.keys()[0] + self.COMPLEMENT_HEADER + self._data.keys()[1:]) -- Gitee From 0f9c2491b09529c1895e2be22fdec26cb7039b77 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 09:25:17 +0800 Subject: [PATCH 02/24] cluster analyze --- .../cluster_data_preprocess/pytorch_data_preprocessor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 7e0487a2c3..44e3737319 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -16,6 +16,7 @@ from collections import defaultdict import os + class PytorchDataPreprocessor: PROFILER_INFO_HEAD = 'profiler_info_' PROFILER_INFO_EXTENSION = '.json' -- Gitee From 0fd612156c7402341c720c49fff8d8197b9c39c9 Mon Sep 17 00:00:00 2001 From: s00809967 Date: Tue, 15 Aug 2023 11:27:03 +0800 Subject: [PATCH 03/24] time --- profiler/merge_profiling_timeline/rank_id.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 profiler/merge_profiling_timeline/rank_id.py diff --git a/profiler/merge_profiling_timeline/rank_id.py b/profiler/merge_profiling_timeline/rank_id.py new file mode 100644 index 0000000000..e69de29bb2 -- Gitee From 354972b6780ed7331dd888a276b4d89caf76effd Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 11:35:46 +0800 Subject: [PATCH 04/24] cluster analyze --- profiler/cluster_analyse/cluster_analysis.py | 2 +- profiler/cluster_analyse/common_func/file_manager.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index e2e8452e65..3ffa8c610a 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -27,7 +27,7 @@ class Interface: self.communication_group = {} def run(self): - FileManager.remove_and_make_output_dir(self.param.get(Constant.COLLECTION_PATH)) + FileManager.create_output_dir(self.collection_path) data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() if not data_map: print("Can not get rank info or profiling data.") diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 8ed02f8e0e..b380be37b4 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -94,7 +94,7 @@ class FileManager: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(output_path) + cls.check_file_or_directory_path(output_path, isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", newline="") as file: @@ -111,7 +111,7 @@ class FileManager: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT)) + cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT), isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) @@ -123,7 +123,7 @@ class FileManager: output_path = os.path.join(collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if os.path.isdir(output_path): try: - cls.check_file_or_directory_path(output_path) + cls.check_file_or_directory_path(output_path, isdir=True) shutil.rmtree(output_path) os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: -- Gitee From b18eb511636bc856455a8a11ef7a236f3009ada1 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 15:35:16 +0800 Subject: [PATCH 05/24] cluster analyze --- .../cluster_analyse/common_func/constant.py | 3 ++- .../common_func/file_manager.py | 4 ++-- .../communication_group_generator.py | 21 ++++++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 7fc82dc834..5532bfb9c7 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -43,7 +43,8 @@ class Constant(object): DATA_MAP = "data_map" COLLECTIVE_GROUP = "collective_group" COMMUNICATION_OPS = "communication_ops" - COLLECTION_PATH = 'collection_path' + COLLECTION_PATH = "collection_path" + COMMUNICATION_GROUP = "communication_group" # step time RANK = 'rank' diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index b380be37b4..5ee9197bf4 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -106,12 +106,12 @@ class FileManager: raise RuntimeError(f"Can't create file: {output_file}") @classmethod - def create_json_file(cls, profiler_path: str, data: list, file_name: str) -> None: + def create_json_file(cls, profiler_path: str, data: dict, file_name: str) -> None: if not data: return output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) output_file = os.path.join(output_path, file_name) - cls.check_file_or_directory_path(os.path.join(output_path, Constant.CLUSTER_ANALYSIS_OUTPUT), isdir=True) + cls.check_file_or_directory_path(output_path, isdir=True) try: with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index f637297a8b..bb156b2c85 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -26,7 +26,7 @@ class CommunicationGroupGenerator: self.collection_path = collection_path self.data_map = data_map self.communication_group = {} - self.collective_group_dict = defaultdict(list) + self.collective_group_dict = defaultdict(set) self.p2p_group_dict = defaultdict(list) self.rank_comm_dir_dict = {} self.communication_ops = [] @@ -42,7 +42,7 @@ class CommunicationGroupGenerator: def analyze_communication_ops(self): for rank_id, rank_id_dict in self.rank_comm_dir_dict.items(): for step_id, step_id_dict in rank_id_dict.items(): - if isinstance(step_id_dict, dict): + if not isinstance(step_id_dict, dict): print(f"rank{rank_id}'s communication.json has a wrong data struct.") continue for comm_op_type, comm_op_dict in step_id_dict.items(): @@ -56,7 +56,7 @@ class CommunicationGroupGenerator: self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) def load_communication_json(self): - for rank_id, profiling_dir_path in self.data_map: + for rank_id, profiling_dir_path in self.data_map.items(): comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_JSON) if comm_dir: self.rank_comm_dir_dict[rank_id] = FileManager.read_json_file(comm_dir) @@ -64,30 +64,31 @@ class CommunicationGroupGenerator: print(f"rank {rank_id} does not have a valid communication.json.") def generate_collective_communication_group(self): - self.communication_group[Constant.COLLECTIVE] = list(self.collective_group_dict.values()) + self.communication_group[Constant.COLLECTIVE] = [list(group) for group in self.collective_group_dict.values()] def generate_p2p_communication_group(self): stage_group = {} - for rank_list in self.collective_group_dict.values(): + for rank_set in self.collective_group_dict.values(): unioned_set = {} for first_rank, stage in stage_group.items(): - if UnionFind.is_connected(set(rank_list), stage): - unioned_set = UnionFind.union(set(rank_list), stage) + if UnionFind.is_connectedt(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage) del stage_group[first_rank] break if unioned_set: stage_group[min(unioned_set)] = unioned_set else: - stage_group[min(rank_list)] = set(rank_list) + stage_group[min(rank_set)] = rank_set first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) - self.communication_group[Constant.P2P] = [stage_group.get(first_rank) for first_rank in first_rank_sort_list] + self.communication_group[Constant.P2P] = \ + [list(stage_group.get(first_rank, {})) for first_rank in first_rank_sort_list] def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): for comm_op in comm_op_dict: if comm_op.startswith('Total'): continue group_name = comm_op.split('@')[-1] - self.collective_group_dict[group_name].append(rank_id) + self.collective_group_dict[group_name].add(rank_id) def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): for comm_op in comm_op_dict: -- Gitee From 44fab58466dc9b876b99e6d6735d0f8bd3c27556 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:26:37 +0800 Subject: [PATCH 06/24] cluster analyze --- .../cluster_analyse/analysis/analysis_facade.py | 13 +++++-------- profiler/cluster_analyse/cluster_analysis.py | 2 ++ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index 13a154df83..a09a065a5d 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -13,19 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor -from communication_group.communication_group_generator import CommunicationGroupGenerator -from common_func.file_manager import FileManager -from common_func.constant import Constant +from analysis.communication_analysis import CommunicationAnalysis +from analysis.step_trace_time_analysis import StepTraceTimeAnalysis class AnalysisFacade: - analysis_module = {} + analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis} def __init__(self, param: dict): self.param = param def cluster_analyze(self): - - pass - + for analysis in self.analysis_module: + analysis.run() diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index 3ffa8c610a..861bdeac65 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -18,6 +18,7 @@ from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreproc from communication_group.communication_group_generator import CommunicationGroupGenerator from common_func.constant import Constant from common_func.file_manager import FileManager +from analysis.analysis_facade import AnalysisFacade class Interface: @@ -44,6 +45,7 @@ class Interface: Constant.COMMUNICATION_OPS: communication_ops, Constant.COMMUNICATION_GROUP: communication_group } + AnalysisFacade(params).cluster_analyze() if __name__ == "__main__": -- Gitee From 2623ff35575e5e3e3d4d7c27b957e2382dc684ec Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:29:55 +0800 Subject: [PATCH 07/24] cluster analyze --- .../cluster_analyse/analysis/analysis_facade.py | 2 +- profiler/cluster_analyse/common_func/constant.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index a09a065a5d..ff352c94e2 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -25,4 +25,4 @@ class AnalysisFacade: def cluster_analyze(self): for analysis in self.analysis_module: - analysis.run() + analysis(self.param).run() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 5532bfb9c7..aa3f6b7a9e 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -38,6 +38,20 @@ class Constant(object): COMM_OP_NAME = "comm_op_name" COMM_OP_INFO = "comm_op_info" TOTAL_OP_INFO = "Total Op Info" + COMMUNICATION_TIME_INFO = "Communication Time Info" + START_TIMESTAMP = "Start Timestamp(us)" + COMMUNICATION_BANDWIDTH_INFO = "Communication Bandwidth Info" + HCOM_SEND = "hcom_send" + HCOM_RECEIVE = "hcom_receive" + SYNCHRONIZATION_TIME_RATIO = "Synchronization Time Ratio" + SYNCHRONIZATION_TIME_MS = "Synchronization Time(ms)" + WAIT_TIME_RATIO = "Wait Time Ratio" + TRANSIT_TIME_MS = "Transit Time(ms)" + TRANSIT_SIZE_MB = "Transit Size(MB)" + SIZE_DISTRIBUTION = "Size Distribution" + WAIT_TIME_MS = "Wait Time(ms)" + BANDWIDTH_GB_S = "Bandwidth(GB/s)" + COMMUNICATION = "communication.json" # params DATA_MAP = "data_map" -- Gitee From 52d671a4dd7c1192de3ce5ca707a6e7bc817dd14 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:32:11 +0800 Subject: [PATCH 08/24] cluster analyze --- .../analysis/communication_analysis.py | 56 +++++++++---------- .../analysis/step_trace_time_analysis.py | 2 +- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index b59a7860b2..040ebbff55 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -63,32 +63,32 @@ class CommunicationAnalysis: total_rank_dict = {} for communication_op, rank_dict in comm_ops.items(): for rank_id, communication_op_info in rank_dict.items(): - total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_TIME_INFO, defaultdict(float)) - total_rank_dict.setdefault(rank_id, {}).setdefault(self.COMMUNICATION_BANDWIDTH_INFO, {}) + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) for com_info, com_info_dict in communication_op_info.items(): - if com_info == self.COMMUNICATION_TIME_INFO: + if com_info == Constant.COMMUNICATION_TIME_INFO: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) - if com_info == self.COMMUNICATION_BANDWIDTH_INFO: + if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) - self.compute_time_ratio(total_rank_dict[rank_id][self.COMMUNICATION_TIME_INFO]) - self.compute_bandwidth_ratio(total_rank_dict[rank_id][self.COMMUNICATION_BANDWIDTH_INFO]) + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): - ratio_list = [self.WAIT_TIME_RATIO, self.SYNCHRONIZATION_TIME_RATIO] + ratio_list = [Constant.WAIT_TIME_RATIO, Constant.SYNCHRONIZATION_TIME_RATIO] for time_info in com_info_dict: - if time_info not in ratio_list and time_info != self.START_TIMESTAMP: + if time_info not in ratio_list and time_info != Constant.START_TIMESTAMP: total_time_info_dict[time_info] += com_info_dict.get(time_info) def combine_bandwidth_info(self, com_info_dict: dict, total_bandwidth_info_dict: dict): - add_list = [self.TRANSIT_TIME_MS, self.TRANSIT_SIZE_MB] - dict_list = [self.SIZE_DISTRIBUTION] + add_list = [Constant.TRANSIT_TIME_MS, Constant.TRANSIT_SIZE_MB] + dict_list = [Constant.SIZE_DISTRIBUTION] for transport_type, part_transport_dict in com_info_dict.items(): if transport_type not in total_bandwidth_info_dict: total_bandwidth_info_dict[transport_type] = { - self.TRANSIT_TIME_MS: 0, - self.TRANSIT_SIZE_MB: 0, - self.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) + Constant.TRANSIT_TIME_MS: 0, + Constant.TRANSIT_SIZE_MB: 0, + Constant.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) } for bandwidth_msg, value in part_transport_dict.items(): if bandwidth_msg in add_list: @@ -97,24 +97,24 @@ class CommunicationAnalysis: self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) def compute_time_ratio(self, total_time_info_dict: dict): - if total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: - total_time_info_dict[self.WAIT_TIME_RATIO] = 0 + if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 else: - total_time_info_dict[self.WAIT_TIME_RATIO] = \ - round(total_time_info_dict[self.WAIT_TIME_MS] / - (total_time_info_dict[self.WAIT_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS]), 4) - if total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[self.TRANSIT_TIME_MS] == 0: - total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = 0 + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 else: - total_time_info_dict[self.SYNCHRONIZATION_TIME_RATIO] = \ - round(total_time_info_dict[self.WAIT_TIME_MS] / - (total_time_info_dict[self.SYNCHRONIZATION_TIME_MS] + - total_time_info_dict[self.TRANSIT_TIME_MS]), 4) + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): - if bandwidth_dict[self.TRANSIT_TIME_MS] == 0: - bandwidth_dict[self.BANDWIDTH_GB_S] = 0 + if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: + bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 else: - bandwidth_dict[self.BANDWIDTH_GB_S] = \ - round(bandwidth_dict[self.TRANSIT_SIZE_MB] / bandwidth_dict[self.TRANSIT_TIME_MS], 4) + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 4b280e88af..4e9d479c49 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -56,7 +56,7 @@ class StepTraceTimeAnalysis: FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) def load_step_trace_time_data(self): - for rank_id, profiling_dir_path in self.data_map: + for rank_id, profiling_dir_path in self.data_map.items(): step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) if step_time_file: self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) -- Gitee From 2faa2e618b29798876a006124e9da8da31778981 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:36:01 +0800 Subject: [PATCH 09/24] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 040ebbff55..889384e9da 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -28,6 +28,12 @@ class CommunicationAnalysis: self.communication_ops = param.get(Constant.COMMUNICATION_OPS) self.comm_ops_struct = {} + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + def run(self): self.split_op_by_group() self.combine_ops_total_info() -- Gitee From 49163a834649c806d11a06579ac3eeb33251c461 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:38:09 +0800 Subject: [PATCH 10/24] cluster analyze --- .../cluster_analyse/analysis/step_trace_time_analysis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 4e9d479c49..8eae2b5591 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -64,8 +64,9 @@ class StepTraceTimeAnalysis: print(f"rank {rank_id} does not have a valid step_trace_time.json.") def analyze_step_time(self, ): - for rank_id, data_bean in self.step_time_dict.items(): - self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + for rank_id, data_bean_list in self.step_time_dict.items(): + for data_bean in data_bean_list: + self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) stage_list = self.communication_group.get(Constant.P2P) if not stage_list: return -- Gitee From 9e7f9c684eff5d28ce8b67bd1f56d9ef2767710b Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:41:36 +0800 Subject: [PATCH 11/24] cluster analyze --- profiler/cluster_analyse/analysis/step_trace_time_analysis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 8eae2b5591..289de153dc 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -66,7 +66,7 @@ class StepTraceTimeAnalysis: def analyze_step_time(self, ): for rank_id, data_bean_list in self.step_time_dict.items(): for data_bean in data_bean_list: - self.step_data_list.append([data_bean.step, Constant.rank, rank_id] + data_bean.row) + self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) stage_list = self.communication_group.get(Constant.P2P) if not stage_list: return @@ -81,7 +81,7 @@ class StepTraceTimeAnalysis: step_group_dict.setdefault(key, []).append(data_list[3:]) for key, data_group_list in step_group_dict.items(): - self.step_time_dict.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + self.step_time_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) def get_headers(self): if self.step_time_dict: -- Gitee From cc1931326ae1f708d8892d79114c39d3b309e32c Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:46:08 +0800 Subject: [PATCH 12/24] cluster analyze --- .../cluster_analyse/analysis/step_trace_time_analysis.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 289de153dc..30e884eab4 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -36,9 +36,9 @@ class StepTraceTimeAnalysis: if not data_group_list: return [] ret = [] - for idx in len(data_group_list[0]): + for idx in range(len(data_group_list[0])): max_val = 0 - for idy in len(data_group_list): + for idy in range(len(data_group_list)): max_val = max(max_val, data_group_list[idy][idx]) ret.append(max_val) return ret @@ -81,7 +81,7 @@ class StepTraceTimeAnalysis: step_group_dict.setdefault(key, []).append(data_list[3:]) for key, data_group_list in step_group_dict.items(): - self.step_time_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + self.step_data_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) def get_headers(self): if self.step_time_dict: -- Gitee From 7cc803c5f7e3d1da08754237facbc47e0a1c866d Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:50:40 +0800 Subject: [PATCH 13/24] cluster analyze --- profiler/cluster_analyse/prof_bean/step_trace_time_bean.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py index b9e4de17e0..80f09d5615 100644 --- a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py +++ b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -16,7 +16,7 @@ class StepTraceTimeBean: STEP = "Step" - COMPLEMENT_HEADER = ["Type", "Index"] + COMPLEMENT_HEADER = ["Step", "Type", "Index"] def __init__(self, data: list): self._data = data @@ -27,7 +27,7 @@ class StepTraceTimeBean: for field_name in self._data.keys(): if field_name == self.STEP: continue - row.append(self._data.get(field_name, )) + row.append(float(self._data.get(field_name, ))) return row @property @@ -36,4 +36,4 @@ class StepTraceTimeBean: @property def all_headers(self) -> list: - return list(self._data.keys()[0] + self.COMPLEMENT_HEADER + self._data.keys()[1:]) + return self.COMPLEMENT_HEADER + list(self._data.keys())[1:] -- Gitee From 90e2137d09c0688cc00903ed69f1afd1a7660f0d Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 16:53:14 +0800 Subject: [PATCH 14/24] cluster analyze --- profiler/cluster_analyse/analysis/step_trace_time_analysis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 30e884eab4..d03991b509 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -51,7 +51,6 @@ class StepTraceTimeAnalysis: def dump_data(self): if not self.step_data_list: print("Can't get step time info!") - self.step_data_list = [row[:3].extend(row[4:]) for row in self.step_data_list] headers = self.get_headers() FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) -- Gitee From 660e205c4d7621ac5f40951051dfe98d79aa5561 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 17:39:01 +0800 Subject: [PATCH 15/24] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 2 +- .../cluster_analyse/analysis/step_trace_time_analysis.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 889384e9da..9e0e1b60d5 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -40,7 +40,7 @@ class CommunicationAnalysis: self.dump_data() def dump_data(self): - if self.comm_ops_struct: + if not self.comm_ops_struct: print("There is no final comm ops data generated") return FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index d03991b509..5f04978116 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -62,7 +62,7 @@ class StepTraceTimeAnalysis: if not self.step_time_dict.get(rank_id): print(f"rank {rank_id} does not have a valid step_trace_time.json.") - def analyze_step_time(self, ): + def analyze_step_time(self): for rank_id, data_bean_list in self.step_time_dict.items(): for data_bean in data_bean_list: self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) @@ -73,8 +73,8 @@ class StepTraceTimeAnalysis: for data_list in self.step_data_list: stage_group = 'None' for stage in stage_list: - if data_list[1] in stage: - stage_group = stage + if data_list[2] in stage: + stage_group = tuple(stage) break key = (data_list[0], stage_group) step_group_dict.setdefault(key, []).append(data_list[3:]) -- Gitee From 4b9c00ff668b185362cc2d5c84f3f4f5a8a9d8aa Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 17:58:06 +0800 Subject: [PATCH 16/24] cluster analyze --- profiler/cluster_analyse/analysis/communication_analysis.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 9e0e1b60d5..10e56b24ca 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -43,7 +43,10 @@ class CommunicationAnalysis: if not self.comm_ops_struct: print("There is no final comm ops data generated") return - FileManager.create_json_file(self.collection_path, self.comm_ops_struct, self.CLUSTER_COMMUNICATION_JSON) + output_comm_data = {} + for key in self.comm_ops_struct: + output_comm_data[str(key)] = self.comm_ops_struct.get(key) + FileManager.create_json_file(self.collection_path, output_comm_data, self.CLUSTER_COMMUNICATION_JSON) def split_op_by_group(self): for single_op in self.communication_ops: -- Gitee From d29b06ca415000445f661626e039fe9813a0d7e6 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 20:51:34 +0800 Subject: [PATCH 17/24] cluster analyze --- .../analysis/communication_analysis.py | 7 ++++--- .../pytorch_data_preprocessor.py | 7 +++---- profiler/cluster_analyse/common_func/constant.py | 1 + .../cluster_analyse/common_func/file_manager.py | 16 ++++++++++++---- .../communication_group_generator.py | 14 ++++++++------ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 10e56b24ca..19f2bf7d57 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -19,7 +19,7 @@ from common_func.file_manager import FileManager class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_commnunication.json" + CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) @@ -79,8 +79,9 @@ class CommunicationAnalysis: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) - self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) - self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) + for rank_id in total_rank_dict: + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 44e3737319..2870048a5f 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -14,6 +14,7 @@ # limitations under the License. from collections import defaultdict +from common_func.file_manager import FileManager import os @@ -25,9 +26,7 @@ class PytorchDataPreprocessor: self.path = os.path.realpath(path) def get_data_map(self) -> dict: - if not os.path.exists(self.path) or not os.access(self.path, os.R_OK): - print('[Error]path:{} not exist or not accessable.'.format(self.path)) - return dict() + FileManager.check_file_or_directory_path(self.path, isdir=True) collector_dirs = [dir_name for dir_name in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dir_name))] ascend_pt_dirs = [dir_name for dir_name in collector_dirs if dir_name.endswith("ascend_pt")] @@ -37,7 +36,7 @@ class PytorchDataPreprocessor: rank_id = self.get_rank_id(dir_name) if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') - + continue rank_id_map[rank_id].append(dir_name) ret_dict = dict() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index aa3f6b7a9e..84d4a9e054 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -27,6 +27,7 @@ class Constant(object): DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + MAX_PATH_LENGTH = 4096 # communication P2P = "p2p" diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 5ee9197bf4..3ac6b843a7 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -33,6 +33,18 @@ class FileManager: Exception Description: when invalid data throw exception """ + if not os.access(path, os.R_OK): + raise RuntimeError( + 'The path {} does not have permission to read. Please check the path permission'.format(path)) + + if len(path) > Constant.MAX_PATH_LENGTH: + msg = f"The length of file path exceeded the maximum value {Constant.MAX_PATH_LENGTH}: {path}" + raise RuntimeError(msg) + + if os.path.islink(path): + msg = f"Invalid profiling path is soft link: {path}" + raise RuntimeError(msg) + if isdir: if not os.path.exists(path): raise RuntimeError('The path {} is not exist.'.format(path)) @@ -47,10 +59,6 @@ class FileManager: if not os.path.isfile(path): raise RuntimeError('{} is an invalid file or non-exist.'.format(path)) - if not os.access(path, os.R_OK): - raise RuntimeError( - 'The path {} does not have permission to read. Please check the path permission'.format(path)) - @classmethod def read_csv_file(cls, file_path: str, class_bean: any) -> list: cls.check_file_or_directory_path(file_path) diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index bb156b2c85..0202ea0e56 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -70,12 +70,14 @@ class CommunicationGroupGenerator: stage_group = {} for rank_set in self.collective_group_dict.values(): unioned_set = {} + remove_key = [] for first_rank, stage in stage_group.items(): - if UnionFind.is_connectedt(rank_set, stage): - unioned_set = UnionFind.union(rank_set, stage) - del stage_group[first_rank] - break + if UnionFind.is_connected(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage, unioned_set) + remove_key.append(first_rank) if unioned_set: + for key in remove_key: + del stage_group[key] stage_group[min(unioned_set)] = unioned_set else: stage_group[min(rank_set)] = rank_set @@ -108,9 +110,9 @@ class CommunicationGroupGenerator: class UnionFind(object): """Disjoint Set Union""" @classmethod - def union(cls, p: set, q: set): + def union(cls, p: set, q: set, o: set): """make p and q the same set""" - return p | q + return p | q | o @classmethod def is_connected(cls, p: set, q: set): -- Gitee From e35439c41c99773f093ade79a39644dbac6a00be Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 20:51:34 +0800 Subject: [PATCH 18/24] cluster analyze --- .../analysis/communication_analysis.py | 7 ++++--- .../pytorch_data_preprocessor.py | 7 +++---- profiler/cluster_analyse/common_func/constant.py | 1 + .../cluster_analyse/common_func/file_manager.py | 16 ++++++++++++---- .../communication_group_generator.py | 14 ++++++++------ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 10e56b24ca..19f2bf7d57 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -19,7 +19,7 @@ from common_func.file_manager import FileManager class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_commnunication.json" + CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) @@ -79,8 +79,9 @@ class CommunicationAnalysis: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) - self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) - self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) + for rank_id in total_rank_dict: + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 44e3737319..2870048a5f 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -14,6 +14,7 @@ # limitations under the License. from collections import defaultdict +from common_func.file_manager import FileManager import os @@ -25,9 +26,7 @@ class PytorchDataPreprocessor: self.path = os.path.realpath(path) def get_data_map(self) -> dict: - if not os.path.exists(self.path) or not os.access(self.path, os.R_OK): - print('[Error]path:{} not exist or not accessable.'.format(self.path)) - return dict() + FileManager.check_file_or_directory_path(self.path, isdir=True) collector_dirs = [dir_name for dir_name in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dir_name))] ascend_pt_dirs = [dir_name for dir_name in collector_dirs if dir_name.endswith("ascend_pt")] @@ -37,7 +36,7 @@ class PytorchDataPreprocessor: rank_id = self.get_rank_id(dir_name) if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') - + continue rank_id_map[rank_id].append(dir_name) ret_dict = dict() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index aa3f6b7a9e..84d4a9e054 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -27,6 +27,7 @@ class Constant(object): DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + MAX_PATH_LENGTH = 4096 # communication P2P = "p2p" diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 5ee9197bf4..3ac6b843a7 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -33,6 +33,18 @@ class FileManager: Exception Description: when invalid data throw exception """ + if not os.access(path, os.R_OK): + raise RuntimeError( + 'The path {} does not have permission to read. Please check the path permission'.format(path)) + + if len(path) > Constant.MAX_PATH_LENGTH: + msg = f"The length of file path exceeded the maximum value {Constant.MAX_PATH_LENGTH}: {path}" + raise RuntimeError(msg) + + if os.path.islink(path): + msg = f"Invalid profiling path is soft link: {path}" + raise RuntimeError(msg) + if isdir: if not os.path.exists(path): raise RuntimeError('The path {} is not exist.'.format(path)) @@ -47,10 +59,6 @@ class FileManager: if not os.path.isfile(path): raise RuntimeError('{} is an invalid file or non-exist.'.format(path)) - if not os.access(path, os.R_OK): - raise RuntimeError( - 'The path {} does not have permission to read. Please check the path permission'.format(path)) - @classmethod def read_csv_file(cls, file_path: str, class_bean: any) -> list: cls.check_file_or_directory_path(file_path) diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index bb156b2c85..0202ea0e56 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -70,12 +70,14 @@ class CommunicationGroupGenerator: stage_group = {} for rank_set in self.collective_group_dict.values(): unioned_set = {} + remove_key = [] for first_rank, stage in stage_group.items(): - if UnionFind.is_connectedt(rank_set, stage): - unioned_set = UnionFind.union(rank_set, stage) - del stage_group[first_rank] - break + if UnionFind.is_connected(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage, unioned_set) + remove_key.append(first_rank) if unioned_set: + for key in remove_key: + del stage_group[key] stage_group[min(unioned_set)] = unioned_set else: stage_group[min(rank_set)] = rank_set @@ -108,9 +110,9 @@ class CommunicationGroupGenerator: class UnionFind(object): """Disjoint Set Union""" @classmethod - def union(cls, p: set, q: set): + def union(cls, p: set, q: set, o: set): """make p and q the same set""" - return p | q + return p | q | o @classmethod def is_connected(cls, p: set, q: set): -- Gitee From 803fc230afcfd93417e4c712db97d486b5a75b68 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 21:43:07 +0800 Subject: [PATCH 19/24] profiling analysis --- profiler/compare_tools/performance_compare.py | 5 +- .../profiling_analysis/__init__.py | 14 ++ .../profiling_analysis/gpu_parser.py | 110 ++++++++++++ .../profiling_analysis/npu_parser.py | 165 ++++++++++++++++++ .../profiling_analysis/parser_helper.py | 37 ++++ .../profiling_analysis/profiling_parse.py | 99 +++++++++++ 6 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 profiler/compare_tools/profiling_analysis/__init__.py create mode 100644 profiler/compare_tools/profiling_analysis/gpu_parser.py create mode 100644 profiler/compare_tools/profiling_analysis/npu_parser.py create mode 100644 profiler/compare_tools/profiling_analysis/parser_helper.py create mode 100644 profiler/compare_tools/profiling_analysis/profiling_parse.py diff --git a/profiler/compare_tools/performance_compare.py b/profiler/compare_tools/performance_compare.py index e2c47ef6c5..cc149ba478 100644 --- a/profiler/compare_tools/performance_compare.py +++ b/profiler/compare_tools/performance_compare.py @@ -7,13 +7,15 @@ import time from generation.comparison_generator import ComparisonGenerator from utils.args_manager import ArgsManager - +from profiling_analysis.profiling_parse import prof_main def main(): sys.path.append(os.path.dirname(__file__)) parser = argparse.ArgumentParser(description="Compare trace of GPU and NPU") parser.add_argument("base_profiling_path", type=str, default='', help="base profiling file path") parser.add_argument("comparison_profiling_path", type=str, default='', help="comparison profiling file path") + parser.add_argument("--disable_profiling_compare", default=False, action='store_true', + help="不进行GPU与NPU的性能拆解") parser.add_argument("--disable_operator_compare", default=False, action='store_true', help="do not compare operator execution time") parser.add_argument("--disable_memory_compare", default=False, action='store_true', @@ -29,6 +31,7 @@ def main(): args = parser.parse_args() ArgsManager().init(args) + prof_main() dir_path = args.output_path if args.output_path else "./" file_name = "performance_comparison_result_{}.xlsx".format(time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))) result_file_path = os.path.join(dir_path, file_name) diff --git a/profiler/compare_tools/profiling_analysis/__init__.py b/profiler/compare_tools/profiling_analysis/__init__.py new file mode 100644 index 0000000000..8400fd5ecd --- /dev/null +++ b/profiler/compare_tools/profiling_analysis/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/compare_tools/profiling_analysis/gpu_parser.py b/profiler/compare_tools/profiling_analysis/gpu_parser.py new file mode 100644 index 0000000000..425b25bc19 --- /dev/null +++ b/profiler/compare_tools/profiling_analysis/gpu_parser.py @@ -0,0 +1,110 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import Counter, defaultdict +import pandas as pd + +import parser_helper + + +class GpuProfilingParser: + def __init__(self, args): + self.trace_events = self.read_profiling_json_file(args.gpu) + self.compute_stream_id = self.infer_compute_stream_id() + self.one_step_time = 0 + self.profiling_info = parser_helper.ProfilingInfo() + + @staticmethod + def read_profiling_json_file(json_path): + data = parser_helper.read_json_file(json_path) + if 'traceEvents' not in data: + raise RuntimeError("The gpu profiling json doesn't contain traceEvents data.") + return data.get('traceEvents') + + def parse_events(self): + cube_time = 0.0 + all_op_time = 0.0 + op_list = [] + compute_stream_dur = 0.0 # 计算流耗时 + marks = defaultdict(int) # mark for compute communication_not_overlapped time + + for event in self.trace_events: + if not isinstance(event, dict): + continue + if event.get('args') and event.get('args').get('stream') == self.compute_stream_id: + compute_stream_dur += float(event.get('dur')) + if not {'name', 'cat', 'dur', 'ts'} < event.keys(): + continue + name = event.get('name') + dur = event.get('dur') + ts = event.get('ts') + cat = event.get('cat') + if cat.lower() != 'kernel': + continue + if 'nccl' in name: + for timestep in range(ts + 1, ts + dur + 1): + marks[str(timestep)] += 1 # mark this timestep in communication stream + continue + else: + for timestep in range(ts + 1, ts + dur + 1): + marks[str(timestep)] += -100 # mark this timestep in compute stream + if 'gemm' in name: + cube_time += float(dur) + all_op_time += float(dur) + op_list.append([ts, name, cat, dur]) + op_dataframe = pd.DataFrame(op_list, columns=['time start', 'name', 'cat', 'dur']) + op_dataframe.to_csv('gpu_perf.csv', index=False) + self.profiling_info.compute_time = compute_stream_dur / 10 ** 6 + self.profiling_info.communication_not_overlapped = len([_ for _, value in marks.items() if value > 0]) / 10 ** 6 + self.profiling_info.cube_time = cube_time / 10 ** 6 + self.profiling_info.vector_time = (all_op_time - cube_time) / 10 ** 6 + self.parse_e2e_time() + if self.one_step_time: + self.profiling_info.scheduling_time = self.one_step_time - all_op_time / 10 ** 6 - \ + self.profiling_info.communication_not_overlapped + else: + self.profiling_info.scheduling_time = self.profiling_info.e2e_time - all_op_time / 10 ** 6 - \ + self.profiling_info.communication_not_overlapped + self.profiling_info.scheduling_ratio = self.profiling_info.scheduling_time / self.profiling_info.e2e_time + self.parse_memory_reserved() + + def parse_e2e_time(self): + compute_events_timeline = [event for event in self.trace_events if + event.get('args') and event.get('args').get('stream') == self.compute_stream_id] + compute_events_timeline = sorted(compute_events_timeline, key=lambda event: event.get('ts')) + self.profiling_info.e2e_time = (compute_events_timeline[-1].get('ts') + compute_events_timeline[-1].get('dur') - + compute_events_timeline[0].get('ts')) / 10 ** 6 + + def parse_memory_reserved(self): + memories = [ + event.get('args').get('Total Reserved') for event in self.trace_events + if event.get('name') == '[memory]' and event.get('args').get('Device Id') >= 0 + ] + if not memories: + print("Gpu profiling data doesn't contain memory info") + return + self.profiling_info.memory_used = max(memories) / 1024 ** 3 + + def infer_compute_stream_id(self): + kernel_stream_ids = [] + for event in self.trace_events: + is_kernel_exec_event = event.get('cat') == 'Kernel' and 'nccl' not in event.get('name') + has_stream_id_event = event.get('args') and event.get('args').get('stream') + if is_kernel_exec_event and has_stream_id_event: + kernel_stream_ids.append(event.get('args').get('stream')) + if not kernel_stream_ids: + raise RuntimeError('The profiling data does not contain kernel running data.') + counter = Counter(kernel_stream_ids) + return counter.most_common(1)[0][0] diff --git a/profiler/compare_tools/profiling_analysis/npu_parser.py b/profiler/compare_tools/profiling_analysis/npu_parser.py new file mode 100644 index 0000000000..333d8a2682 --- /dev/null +++ b/profiler/compare_tools/profiling_analysis/npu_parser.py @@ -0,0 +1,165 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import pandas as pd +from collections import defaultdict +import parser_helper + + +class NpuProfilingParser: + def __init__(self, npu_step_time, add_cube_name, npu_file_path): + self.npu_json_file = npu_file_path.get('trace_view') + self.npu_summary_file = npu_file_path.get('op_summary') + self.npu_mem_file = npu_file_path.get('memory_record') + self.profiling_info = parser_helper.ProfilingInfo() + self.npu_step_time = npu_step_time + self.parallel_time = 0 + self.aicore_time = 0 + self.cube_op_type = ['MatMul', 'BatchMatMul'] + self.cube_op_type = list(set(self.cube_op_type + add_cube_name)) + self.min_aicore_ts = sys.float_info.max + self.max_aicore_ts = sys.float_info.min + + def parse_npu_json_events(self): + if not self.npu_json_file: + print('Npu trace json file is not available.') + return + compute_time = 0 + min_ts = sys.float_info.max + max_ts = sys.float_info.min + ts_flag = False # 表明没有获取到compute time的耗时 + data = parser_helper.read_json_file(self.npu_json_file) + event_wait_sqe = defaultdict(list) + ai_core_dict = defaultdict(list) + event_wait_sqe_res = defaultdict(float) + ai_core_res = defaultdict(float) + for dic in data: + self.get_ts_by_task_type(dic, event_wait_sqe, ai_core_dict, event_wait_sqe_res, ai_core_res) + if ('name' in dic) and (dic.get('name') == 'compute_time'): + ts_flag = True + ts = dic.get('ts') + dur = dic.get('dur') + compute_time += dur + min_ts = ts if ts < min_ts else min_ts + max_ts = (ts + dur) if (ts + dur) > max_ts else max_ts + # AI_CORE和EVENT_WAIT_SQE共存为计算流 + compute_stream = [] + parallel_stream = [] + # 不存在算子并行的情况 + if len(ai_core_dict) == 1: + compute_stream.append(min(ai_core_dict.keys())) + elif len(ai_core_dict) == 2: # 2个ai_core,存在并行流(当前最多2条算子计算流) + compute_stream = list(event_wait_sqe.keys() & ai_core_dict.keys()) + parallel_stream = list(ai_core_dict.keys() - set(compute_stream)) + else: + print('Npu trace json file lack of Stream info') + return + cs_event_wait_sqe_list = event_wait_sqe[compute_stream[0]] + if parallel_stream: + cs_ai_core_list = ai_core_dict[parallel_stream[0]] + sorted(cs_event_wait_sqe_list, key=lambda x: (x[0])) + sorted(cs_ai_core_list, key=lambda x: (x[0])) + self.parallel_time = self.interval_intersection(cs_event_wait_sqe_list, cs_ai_core_list) + self.profiling_info.compute_time = compute_time / 10 ** 6 if ts_flag else ai_core_res[compute_stream[0]] / 10 ** 6 + self.profiling_info.e2e_time = (max_ts - min_ts) / 10 ** 6 if ts_flag else (self.max_aicore_ts - self.min_aicore_ts) / 10 ** 6 + self.profiling_info.communication_not_overlapped = (event_wait_sqe_res[compute_stream[0]] - + self.parallel_time) / 10 ** 6 + time_required = (self.profiling_info.cube_time + self.profiling_info.vector_time) + \ + self.profiling_info.communication_not_overlapped + if self.npu_step_time: + self.profiling_info.scheduling_time = self.npu_step_time - time_required + else: + self.profiling_info.scheduling_time = self.profiling_info.e2e_time - time_required + self.profiling_info.scheduling_ratio = self.profiling_info.scheduling_time / self.profiling_info.e2e_time \ + if self.profiling_info.e2e_time != 0 else 0 + + def parse_npu_csv_events(self): + if not self.npu_summary_file: + print('Npu op summary csv file is not available.') + return + info = pd.read_csv(self.npu_summary_file, index_col=None) + cube_time = 0.0 + vec_time = 0.0 + ai_core_time = 0.0 + vec_mac_flag = True # True标记当前summary文件中存在pmu信息 + if info.get('aic_mac_time(us)') is None or info.get('aiv_vec_time(us)') is None: + print('当前的profiling结果可能是极简模式,通过cube算子白名单进行区分,白名单如下:') + print(self.cube_op_type) + vec_mac_flag = False + for i in range(len(info['Model ID'])): + task_type = info.loc[i, 'Task Type'] + if task_type not in ['AI_CORE']: + continue + task_durations = info.loc[i, 'Task Duration(us)'] + ai_core_time += task_durations + op_type = info.loc[i, 'OP Type'] + if not vec_mac_flag: # 如果是极简模式根据OP_Type计算完cube time后提前返回 + cube_time += task_durations if op_type in self.cube_op_type else 0.0 + continue + aiv_vec_time = info.loc[i, 'aiv_vec_time(us)'] + if aiv_vec_time > 0: + vec_time += task_durations + + if vec_mac_flag: + cube_time = (ai_core_time - vec_time) / 10 ** 6 + vec_time /= 10 ** 6 + else: + vec_time = (ai_core_time - cube_time) / 10 ** 6 + cube_time /= 10 ** 6 + self.profiling_info.cube_time = cube_time + self.profiling_info.vector_time = vec_time + if not self.npu_mem_file: + print('Npu op memory csv file is not available.') + return + try: + info = pd.read_csv(self.npu_mem_file, usecols=['Total Reserved(MB)'], index_col=None) + except ValueError: + print('Npu profiling data does not contain memory info.') + else: + self.profiling_info.memory_used = max(info.get('Total Reserved(MB)')) / 1024 + + @staticmethod + def interval_intersection(cs_event_wait_sqe_list, cs_ai_core_list): + ans = 0 + i = 0 + j = 0 + while i < len(cs_event_wait_sqe_list) and j < len(cs_ai_core_list): + lo = max(cs_event_wait_sqe_list[i][0], cs_ai_core_list[j][0]) + hi = min(cs_event_wait_sqe_list[i][1], cs_ai_core_list[j][1]) + if lo <= hi: + ans += (hi - lo) + if cs_event_wait_sqe_list[i][1] < cs_ai_core_list[j][1]: + i += 1 + else: + j += 1 + return ans + + def get_ts_by_task_type(self, dic, event_wait_sqe, ai_core_dict, enent_wait_res, ai_core_res): + if not dic.get('args'): + return + args = dic.get('args') + if args.get('Stream Id'): + stream_id = args.get('Stream Id') + ts = dic.get('ts') + dur = dic.get('dur') + if args.get('Task Type') == 'EVENT_WAIT_SQE': + enent_wait_res[stream_id] += dur + event_wait_sqe[stream_id].append([ts, ts + dur]) + elif args.get('Task Type') == 'AI_CORE': + self.min_aicore_ts = ts if ts < self.min_aicore_ts else self.min_aicore_ts + self.max_aicore_ts = (ts + dur) if (ts + dur) > self.max_aicore_ts else self.max_aicore_ts + ai_core_res[stream_id] += dur + ai_core_dict[stream_id].append([ts, ts + dur]) diff --git a/profiler/compare_tools/profiling_analysis/parser_helper.py b/profiler/compare_tools/profiling_analysis/parser_helper.py new file mode 100644 index 0000000000..958a3146bb --- /dev/null +++ b/profiler/compare_tools/profiling_analysis/parser_helper.py @@ -0,0 +1,37 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + + +class ProfilingInfo: + def __init__(self): + self.cube_time = 0.0 + self.vector_time = 0.0 + self.compute_time = 0.0 + self.communication_not_overlapped = 0.0 + self.scheduling_ratio = 0.0 + self.memory_used = 0.0 + self.e2e_time = 0.0 + self.scheduling_time = 0.0 + + +def read_json_file(path): + if not os.path.isfile(path): + raise ValueError(f'The path "{path}" is not a valid json file.') + with open(path, 'r', encoding='utf-8') as json_handler: + data = json.load(json_handler) + return data diff --git a/profiler/compare_tools/profiling_analysis/profiling_parse.py b/profiler/compare_tools/profiling_analysis/profiling_parse.py new file mode 100644 index 0000000000..7af844db70 --- /dev/null +++ b/profiler/compare_tools/profiling_analysis/profiling_parse.py @@ -0,0 +1,99 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os + +from prettytable import PrettyTable + +from gpu_parser import GpuProfilingParser +from npu_parser import NpuProfilingParser +from parser_helper import ProfilingInfo +from compare_tools.utils.args_manager import ArgsManager +from compare_tools.utils.constant import Constant + + +def parse_command(): + parser = argparse.ArgumentParser() + parser.add_argument('-g', '--gpu', required=False, default='', metavar='(FILE)', help='Gpu profiling json file.') + parser.add_argument('-glt', '--gpu_log_time', required=False, default=0.0, type=float, help='Gpu one step time(s)') + parser.add_argument('-n', '--npu', required=False, default='', metavar='(FILE)', + help='Npu single core profiling root path.') + parser.add_argument('-nlt', '--npu_log_time', required=False, default=0.0, metavar='(FILE)', type=float, + help='Npu one step time(s).') + parser.add_argument('-aop', '--add_cube_op', required=False, default=[], nargs='*', help='add cube op name') + return parser.parse_args() + + +def show_table(gpu_profiling_info, npu_profiling_info): + table = PrettyTable() + table.title = '大模型性能拆解' + table.field_names = ['', 'cube算子', 'vector算子', '计算流耗时', '通信', '调度耗时', '调度占比', '内存', + 'E2E性能值'] + table.add_row(['GPU基线', f'{gpu_profiling_info.cube_time:.3f}s', f'{gpu_profiling_info.vector_time:.3f}s', + f'{gpu_profiling_info.compute_time:.3f}s', f'{gpu_profiling_info.communication_not_overlapped: .3f}s', + f'{gpu_profiling_info.scheduling_time:.3f}', f'{gpu_profiling_info.scheduling_ratio:.2%}', + f'{gpu_profiling_info.memory_used:.2f}G', f'{gpu_profiling_info.e2e_time:.3f}s']) + table.add_row(['当前现状', f'{npu_profiling_info.cube_time:.3f}s', f'{npu_profiling_info.vector_time:.3f}s', + f'{npu_profiling_info.compute_time:.3f}s', f'{npu_profiling_info.communication_not_overlapped: .3f}s', + f'{npu_profiling_info.scheduling_time:.3f}', f'{npu_profiling_info.scheduling_ratio:.2%}', + f'{npu_profiling_info.memory_used:.2f}G', f'{npu_profiling_info.e2e_time:.3f}s']) + print(table) + + +def parse_gpu(args): + if args.gpu: + gpu_parser = GpuProfilingParser(args) + gpu_parser.parse_events() + return gpu_parser.profiling_info + print('Gpu trace json file is not specified.') + return ProfilingInfo() + + +def parse_npu(args, npu_path): + npu_parser = NpuProfilingParser(0, False, npu_path) + npu_parser.parse_npu_csv_events() + npu_parser.parse_npu_json_events() + return npu_parser.profiling_info + + +def prof_main(): + args = parse_command() + if ArgsManager().base_profiling_type == Constant.NPU: + args.npu = ArgsManager().base_profiling + elif ArgsManager().base_profiling_type == Constant.GPU: + args.gpu = ArgsManager().base_profiling + if ArgsManager().comparison_profiling_type == Constant.NPU: + args.npu = ArgsManager().comparison_profiling + elif ArgsManager().comparison_profiling_type == Constant.GPU: + args.gpu = ArgsManager().comparison_profiling + + if not args.npu or not args.gpu: + return + + npu_path = {'trace_view': None, 'memory_record': None, 'op_summary': None} + for root, _, files in os.walk(args.npu): + for file in files: + if file == 'trace_view.json': + npu_path['trace_view'] = os.path.join(root, file) + if file == 'memory_record.csv': + npu_path['memory_record'] = os.path.join(root, file) + if 'op_summary_' in file: + npu_path['op_summary'] = os.path.join(root, file) + show_table(parse_gpu(args), parse_npu(args, npu_path)) + + +if __name__ == '__main__': + prof_main() -- Gitee From ef9cafeb374a9f01471dc76f822455458419c714 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 21:59:56 +0800 Subject: [PATCH 20/24] profiling analysis --- profiler/compare_tools/performance_compare.py | 3 ++- .../profiling_analysis/profiling_parse.py | 11 ++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/profiler/compare_tools/performance_compare.py b/profiler/compare_tools/performance_compare.py index cc149ba478..ae85c6b49d 100644 --- a/profiler/compare_tools/performance_compare.py +++ b/profiler/compare_tools/performance_compare.py @@ -9,6 +9,7 @@ from generation.comparison_generator import ComparisonGenerator from utils.args_manager import ArgsManager from profiling_analysis.profiling_parse import prof_main + def main(): sys.path.append(os.path.dirname(__file__)) parser = argparse.ArgumentParser(description="Compare trace of GPU and NPU") @@ -31,7 +32,7 @@ def main(): args = parser.parse_args() ArgsManager().init(args) - prof_main() + prof_main(args) dir_path = args.output_path if args.output_path else "./" file_name = "performance_comparison_result_{}.xlsx".format(time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))) result_file_path = os.path.join(dir_path, file_name) diff --git a/profiler/compare_tools/profiling_analysis/profiling_parse.py b/profiler/compare_tools/profiling_analysis/profiling_parse.py index 7af844db70..05efb33183 100644 --- a/profiler/compare_tools/profiling_analysis/profiling_parse.py +++ b/profiler/compare_tools/profiling_analysis/profiling_parse.py @@ -18,9 +18,9 @@ import os from prettytable import PrettyTable -from gpu_parser import GpuProfilingParser -from npu_parser import NpuProfilingParser -from parser_helper import ProfilingInfo +from compare_tools.profiling_analysis.gpu_parser import GpuProfilingParser +from compare_tools.profiling_analysis.npu_parser import NpuProfilingParser +from compare_tools.profiling_analysis.parser_helper import ProfilingInfo from compare_tools.utils.args_manager import ArgsManager from compare_tools.utils.constant import Constant @@ -69,8 +69,9 @@ def parse_npu(args, npu_path): return npu_parser.profiling_info -def prof_main(): - args = parse_command() +def prof_main(args): + if not args: + args = parse_command() if ArgsManager().base_profiling_type == Constant.NPU: args.npu = ArgsManager().base_profiling elif ArgsManager().base_profiling_type == Constant.GPU: -- Gitee From 95d9291ac0e257a22b012de0bcf9883defb23727 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 22:42:10 +0800 Subject: [PATCH 21/24] profiling analysis --- profiler/compare_tools/performance_compare.py | 17 +++++++++++- .../profiling_analysis/gpu_parser.py | 4 +-- .../profiling_analysis/profiling_parse.py | 27 ++++++------------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/profiler/compare_tools/performance_compare.py b/profiler/compare_tools/performance_compare.py index ae85c6b49d..812a501b85 100644 --- a/profiler/compare_tools/performance_compare.py +++ b/profiler/compare_tools/performance_compare.py @@ -8,6 +8,21 @@ import time from generation.comparison_generator import ComparisonGenerator from utils.args_manager import ArgsManager from profiling_analysis.profiling_parse import prof_main +from utils.constant import Constant + + +def performance_compare(): + npu_path = '' + gpu_path = '' + if ArgsManager().base_profiling_type == Constant.NPU: + npu_path = ArgsManager().base_profiling + elif ArgsManager().base_profiling_type == Constant.GPU: + npu_path = ArgsManager().base_profiling + if ArgsManager().comparison_profiling_type == Constant.NPU: + gpu_path = ArgsManager().comparison_profiling + elif ArgsManager().comparison_profiling_type == Constant.GPU: + gpu_path = ArgsManager().comparison_profiling + prof_main(npu_path, gpu_path) def main(): @@ -32,7 +47,7 @@ def main(): args = parser.parse_args() ArgsManager().init(args) - prof_main(args) + dir_path = args.output_path if args.output_path else "./" file_name = "performance_comparison_result_{}.xlsx".format(time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))) result_file_path = os.path.join(dir_path, file_name) diff --git a/profiler/compare_tools/profiling_analysis/gpu_parser.py b/profiler/compare_tools/profiling_analysis/gpu_parser.py index 425b25bc19..92936e3916 100644 --- a/profiler/compare_tools/profiling_analysis/gpu_parser.py +++ b/profiler/compare_tools/profiling_analysis/gpu_parser.py @@ -20,8 +20,8 @@ import parser_helper class GpuProfilingParser: - def __init__(self, args): - self.trace_events = self.read_profiling_json_file(args.gpu) + def __init__(self, gpu_path): + self.trace_events = self.read_profiling_json_file(gpu_path) self.compute_stream_id = self.infer_compute_stream_id() self.one_step_time = 0 self.profiling_info = parser_helper.ProfilingInfo() diff --git a/profiler/compare_tools/profiling_analysis/profiling_parse.py b/profiler/compare_tools/profiling_analysis/profiling_parse.py index 05efb33183..3c91a0a72e 100644 --- a/profiler/compare_tools/profiling_analysis/profiling_parse.py +++ b/profiler/compare_tools/profiling_analysis/profiling_parse.py @@ -53,39 +53,28 @@ def show_table(gpu_profiling_info, npu_profiling_info): print(table) -def parse_gpu(args): - if args.gpu: - gpu_parser = GpuProfilingParser(args) +def parse_gpu(gpu_path): + if gpu_path: + gpu_parser = GpuProfilingParser(gpu_path) gpu_parser.parse_events() return gpu_parser.profiling_info print('Gpu trace json file is not specified.') return ProfilingInfo() -def parse_npu(args, npu_path): +def parse_npu(npu_path): npu_parser = NpuProfilingParser(0, False, npu_path) npu_parser.parse_npu_csv_events() npu_parser.parse_npu_json_events() return npu_parser.profiling_info -def prof_main(args): - if not args: - args = parse_command() - if ArgsManager().base_profiling_type == Constant.NPU: - args.npu = ArgsManager().base_profiling - elif ArgsManager().base_profiling_type == Constant.GPU: - args.gpu = ArgsManager().base_profiling - if ArgsManager().comparison_profiling_type == Constant.NPU: - args.npu = ArgsManager().comparison_profiling - elif ArgsManager().comparison_profiling_type == Constant.GPU: - args.gpu = ArgsManager().comparison_profiling - - if not args.npu or not args.gpu: +def prof_main(npu_path, gpu_path): + if not npu_path or not gpu_path: return npu_path = {'trace_view': None, 'memory_record': None, 'op_summary': None} - for root, _, files in os.walk(args.npu): + for root, _, files in os.walk(npu_path): for file in files: if file == 'trace_view.json': npu_path['trace_view'] = os.path.join(root, file) @@ -93,7 +82,7 @@ def prof_main(args): npu_path['memory_record'] = os.path.join(root, file) if 'op_summary_' in file: npu_path['op_summary'] = os.path.join(root, file) - show_table(parse_gpu(args), parse_npu(args, npu_path)) + show_table(parse_gpu(gpu_path), parse_npu(npu_path)) if __name__ == '__main__': -- Gitee From 8eaf33411e78e718336bc7188d94a2eca343b30d Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 22:46:31 +0800 Subject: [PATCH 22/24] profiling analysis --- profiler/compare_tools/performance_compare.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/profiler/compare_tools/performance_compare.py b/profiler/compare_tools/performance_compare.py index 812a501b85..2df68c15f0 100644 --- a/profiler/compare_tools/performance_compare.py +++ b/profiler/compare_tools/performance_compare.py @@ -15,13 +15,13 @@ def performance_compare(): npu_path = '' gpu_path = '' if ArgsManager().base_profiling_type == Constant.NPU: - npu_path = ArgsManager().base_profiling + npu_path = ArgsManager().base_profiling.file_path elif ArgsManager().base_profiling_type == Constant.GPU: - npu_path = ArgsManager().base_profiling + npu_path = ArgsManager().base_profiling.file_path if ArgsManager().comparison_profiling_type == Constant.NPU: - gpu_path = ArgsManager().comparison_profiling + gpu_path = ArgsManager().comparison_profiling.file_path elif ArgsManager().comparison_profiling_type == Constant.GPU: - gpu_path = ArgsManager().comparison_profiling + gpu_path = ArgsManager().comparison_profiling.file_path prof_main(npu_path, gpu_path) @@ -47,7 +47,7 @@ def main(): args = parser.parse_args() ArgsManager().init(args) - + performance_compare() dir_path = args.output_path if args.output_path else "./" file_name = "performance_comparison_result_{}.xlsx".format(time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))) result_file_path = os.path.join(dir_path, file_name) -- Gitee From cd8a201a9b204f7a7779f496066f246706cb03ad Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 22:50:47 +0800 Subject: [PATCH 23/24] profiling analysis --- .../profiling_analysis/profiling_parse.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/profiler/compare_tools/profiling_analysis/profiling_parse.py b/profiler/compare_tools/profiling_analysis/profiling_parse.py index 3c91a0a72e..282591e0ff 100644 --- a/profiler/compare_tools/profiling_analysis/profiling_parse.py +++ b/profiler/compare_tools/profiling_analysis/profiling_parse.py @@ -63,7 +63,7 @@ def parse_gpu(gpu_path): def parse_npu(npu_path): - npu_parser = NpuProfilingParser(0, False, npu_path) + npu_parser = NpuProfilingParser(0, [], npu_path) npu_parser.parse_npu_csv_events() npu_parser.parse_npu_json_events() return npu_parser.profiling_info @@ -73,16 +73,16 @@ def prof_main(npu_path, gpu_path): if not npu_path or not gpu_path: return - npu_path = {'trace_view': None, 'memory_record': None, 'op_summary': None} + npu_dir = {'trace_view': None, 'memory_record': None, 'op_summary': None} for root, _, files in os.walk(npu_path): for file in files: if file == 'trace_view.json': - npu_path['trace_view'] = os.path.join(root, file) + npu_dir['trace_view'] = os.path.join(root, file) if file == 'memory_record.csv': - npu_path['memory_record'] = os.path.join(root, file) + npu_dir['memory_record'] = os.path.join(root, file) if 'op_summary_' in file: - npu_path['op_summary'] = os.path.join(root, file) - show_table(parse_gpu(gpu_path), parse_npu(npu_path)) + npu_dir['op_summary'] = os.path.join(root, file) + show_table(parse_gpu(gpu_path), parse_npu(npu_dir)) if __name__ == '__main__': -- Gitee From 1166aeb84951a9b8cfa14205a5adbfd27e5542d1 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Mon, 28 Aug 2023 21:44:52 +0800 Subject: [PATCH 24/24] communication matrix --- .../analysis/communication_analysis.py | 191 +++++++++++++++--- .../cluster_analyse/common_func/constant.py | 2 + 2 files changed, 162 insertions(+), 31 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index bc953c4d7f..e69a37d2ef 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -13,31 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from abc import abstractmethod + from common_func.constant import Constant from collections import defaultdict from common_func.file_manager import FileManager -class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" +class BaseCommAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) self.data_map = param.get(Constant.DATA_MAP) self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) - self.communication_ops = param.get(Constant.COMMUNICATION_OPS) self.comm_ops_struct = {} @staticmethod - def combine_size_distribution(op_dict: dict, total_dict: dict): - for size, size_info in op_dict.items(): - total_dict[size][0] += size_info[0] - total_dict[size][1] += size_info[1] + def compute_ratio(dividend: float, divisor: float): + if not divisor: + return 0 + else: + return round(dividend / divisor, 4) + @abstractmethod def run(self): - self.split_op_by_group() - self.combine_ops_total_info() - self.dump_data() + pass def dump_data(self): if not self.comm_ops_struct: @@ -66,14 +67,36 @@ class CommunicationAnalysis: for step_id, communication_ops in group_dict.items(): self.compute_total_info(communication_ops) + +class CommunicationAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + def compute_total_info(self, comm_ops: dict): if not comm_ops: return - total_rank_dict = {} + total_rank_dict = defaultdict(lambda: { + Constant.COMMUNICATION_TIME_INFO: defaultdict(float), + Constant.COMMUNICATION_BANDWIDTH_INFO: {} + }) for communication_op, rank_dict in comm_ops.items(): for rank_id, communication_op_info in rank_dict.items(): - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) for com_info, com_info_dict in communication_op_info.items(): if com_info == Constant.COMMUNICATION_TIME_INFO: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) @@ -107,24 +130,130 @@ class CommunicationAnalysis: self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) def compute_time_ratio(self, total_time_info_dict: dict): - if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ - round(total_time_info_dict[Constant.WAIT_TIME_MS] / - (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) - if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ - round(total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] / - (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + - total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.WAIT_TIME_MS, 0), + total_time_info_dict.get(Constant.WAIT_TIME_MS, 0) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS, 0)) + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS), + total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS)) def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): - if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 - else: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ - round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(bandwidth_dict.get(Constant.TRANSIT_SIZE_MB, 0) / + bandwidth_dict.get(Constant.TRANSIT_TIME_MS, 0)) + + +class CommMatrixAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication_matrix.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = {} + + @staticmethod + def combine_link(link_info_dict: dict, single_link_dict: dict): + link_info_dict[Constant.TRANSPORT_TYPE] = single_link_dict.get(Constant.TRANSPORT_TYPE) + link_info_dict[Constant.TRANSIT_TIME_MS] += single_link_dict.get(Constant.TRANSIT_TIME_MS) + link_info_dict[Constant.TRANSIT_SIZE_MB] += single_link_dict.get(Constant.TRANSIT_SIZE_MB) + + def run(self): + self.load_communication_matrix_data() + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def load_communication_matrix_data(self): + rank_comm_dict = {} + for rank_id, profiling_dir_path in self.data_map.items(): + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_MATRIX_JSON) + rank_comm_dict[rank_id] = FileManager.read_json_file(comm_dir) + if not rank_comm_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid communication_matrix.json.") + self.construct_matrix_data(rank_comm_dict) + + def construct_matrix_data(self, rank_comm_dict: dict): + for rank_id, rank_id_dict in rank_comm_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + self.add_comm_ops(rank_id, step_id, step_id_dict) + + def add_comm_ops(self, rank_id: int, step_id: int, step_id_dict: dict): + for comm_op_type, comm_dict in step_id_dict.items(): + if comm_op_type != Constant.COLLECTIVE or comm_op_type != Constant.P2P: + print(f"Unknown communication opertors type!") + continue + for op_name, op_link_info in comm_dict.items(): + if op_name.startswith('Total'): + continue + group_name = op_name.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: op_name, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: op_link_info + }) + + def compute_total_info(self, step_dict: dict): + self.merge_same_links(step_dict) + self.combine_link_info(step_dict) + + def merge_same_links(self, step_dict: dict): + def process_link_key(): + for link_key in rank_dict: + if '-' not in link_key: + print(f"{op_name} has an invalid link key {link_key}!") + break + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + if src_rank == dst_rank: + if src_rank not in project_local_global_rank_map: + project_local_global_rank_map[src_rank] = rank_id + else: + print(f"In the same communication group, local ranks projecting to global ranks repeat!") + else: + self.combine_link(link_info[link_key], rank_dict[link_key]) + + def convert_local_to_global_rank(): + tmp_link = {} + for link_key, link_dict in link_info.items(): + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + src_rank = project_local_global_rank_map[src_rank] \ + if src_rank in project_local_global_rank_map else src_rank + dst_rank = project_local_global_rank_map[dst_rank] \ + if dst_rank in project_local_global_rank_map else dst_rank + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + tmp_link[f"{src_rank}-{dst_rank}"] = link_dict + return tmp_link + + project_local_global_rank_map = dict() + for op_name, op_dict in step_dict.items(): + link_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for rank_id, rank_dict in op_dict.items(): + process_link_key() + step_dict[op_name] = convert_local_to_global_rank() + + def combine_link_info(self, step_dict: dict): + total_op_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for op_name, op_dict in step_dict.items(): + for link_key, link_dict in op_dict.items(): + self.combine_link(total_op_info[link_key], link_dict) + for link_key, link_dict in total_op_info.items(): + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 84d4a9e054..8910099c7f 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -20,6 +20,7 @@ class Constant(object): CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" + COMM_MATRIX_JSON = "communication_matrix.json" STEP_TIME_CSV = "step_trace_time.csv" # file authority @@ -60,6 +61,7 @@ class Constant(object): COMMUNICATION_OPS = "communication_ops" COLLECTION_PATH = "collection_path" COMMUNICATION_GROUP = "communication_group" + TRANSPORT_TYPE = "Transport Type" # step time RANK = 'rank' -- Gitee