diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index bc953c4d7f7784569644f8aa93ff611e19e41556..e69a37d2ef4c1164e5757525237c7a2dbbcfad07 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -13,31 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from abc import abstractmethod + from common_func.constant import Constant from collections import defaultdict from common_func.file_manager import FileManager -class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" +class BaseCommAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) self.data_map = param.get(Constant.DATA_MAP) self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) - self.communication_ops = param.get(Constant.COMMUNICATION_OPS) self.comm_ops_struct = {} @staticmethod - def combine_size_distribution(op_dict: dict, total_dict: dict): - for size, size_info in op_dict.items(): - total_dict[size][0] += size_info[0] - total_dict[size][1] += size_info[1] + def compute_ratio(dividend: float, divisor: float): + if not divisor: + return 0 + else: + return round(dividend / divisor, 4) + @abstractmethod def run(self): - self.split_op_by_group() - self.combine_ops_total_info() - self.dump_data() + pass def dump_data(self): if not self.comm_ops_struct: @@ -66,14 +67,36 @@ class CommunicationAnalysis: for step_id, communication_ops in group_dict.items(): self.compute_total_info(communication_ops) + +class CommunicationAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + def compute_total_info(self, comm_ops: dict): if not comm_ops: return - total_rank_dict = {} + total_rank_dict = defaultdict(lambda: { + Constant.COMMUNICATION_TIME_INFO: defaultdict(float), + Constant.COMMUNICATION_BANDWIDTH_INFO: {} + }) for communication_op, rank_dict in comm_ops.items(): for rank_id, communication_op_info in rank_dict.items(): - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) for com_info, com_info_dict in communication_op_info.items(): if com_info == Constant.COMMUNICATION_TIME_INFO: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) @@ -107,24 +130,130 @@ class CommunicationAnalysis: self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) def compute_time_ratio(self, total_time_info_dict: dict): - if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ - round(total_time_info_dict[Constant.WAIT_TIME_MS] / - (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) - if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ - round(total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] / - (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + - total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.WAIT_TIME_MS, 0), + total_time_info_dict.get(Constant.WAIT_TIME_MS, 0) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS, 0)) + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS), + total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS)) def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): - if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 - else: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ - round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(bandwidth_dict.get(Constant.TRANSIT_SIZE_MB, 0) / + bandwidth_dict.get(Constant.TRANSIT_TIME_MS, 0)) + + +class CommMatrixAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication_matrix.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = {} + + @staticmethod + def combine_link(link_info_dict: dict, single_link_dict: dict): + link_info_dict[Constant.TRANSPORT_TYPE] = single_link_dict.get(Constant.TRANSPORT_TYPE) + link_info_dict[Constant.TRANSIT_TIME_MS] += single_link_dict.get(Constant.TRANSIT_TIME_MS) + link_info_dict[Constant.TRANSIT_SIZE_MB] += single_link_dict.get(Constant.TRANSIT_SIZE_MB) + + def run(self): + self.load_communication_matrix_data() + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def load_communication_matrix_data(self): + rank_comm_dict = {} + for rank_id, profiling_dir_path in self.data_map.items(): + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_MATRIX_JSON) + rank_comm_dict[rank_id] = FileManager.read_json_file(comm_dir) + if not rank_comm_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid communication_matrix.json.") + self.construct_matrix_data(rank_comm_dict) + + def construct_matrix_data(self, rank_comm_dict: dict): + for rank_id, rank_id_dict in rank_comm_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + self.add_comm_ops(rank_id, step_id, step_id_dict) + + def add_comm_ops(self, rank_id: int, step_id: int, step_id_dict: dict): + for comm_op_type, comm_dict in step_id_dict.items(): + if comm_op_type != Constant.COLLECTIVE or comm_op_type != Constant.P2P: + print(f"Unknown communication opertors type!") + continue + for op_name, op_link_info in comm_dict.items(): + if op_name.startswith('Total'): + continue + group_name = op_name.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: op_name, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: op_link_info + }) + + def compute_total_info(self, step_dict: dict): + self.merge_same_links(step_dict) + self.combine_link_info(step_dict) + + def merge_same_links(self, step_dict: dict): + def process_link_key(): + for link_key in rank_dict: + if '-' not in link_key: + print(f"{op_name} has an invalid link key {link_key}!") + break + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + if src_rank == dst_rank: + if src_rank not in project_local_global_rank_map: + project_local_global_rank_map[src_rank] = rank_id + else: + print(f"In the same communication group, local ranks projecting to global ranks repeat!") + else: + self.combine_link(link_info[link_key], rank_dict[link_key]) + + def convert_local_to_global_rank(): + tmp_link = {} + for link_key, link_dict in link_info.items(): + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + src_rank = project_local_global_rank_map[src_rank] \ + if src_rank in project_local_global_rank_map else src_rank + dst_rank = project_local_global_rank_map[dst_rank] \ + if dst_rank in project_local_global_rank_map else dst_rank + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + tmp_link[f"{src_rank}-{dst_rank}"] = link_dict + return tmp_link + + project_local_global_rank_map = dict() + for op_name, op_dict in step_dict.items(): + link_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for rank_id, rank_dict in op_dict.items(): + process_link_key() + step_dict[op_name] = convert_local_to_global_rank() + + def combine_link_info(self, step_dict: dict): + total_op_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for op_name, op_dict in step_dict.items(): + for link_key, link_dict in op_dict.items(): + self.combine_link(total_op_info[link_key], link_dict) + for link_key, link_dict in total_op_info.items(): + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 84d4a9e0545e85d6d7d22cc2a3f30bdd1300bb46..8910099c7f845219fb9ec12e0dd1e916d007b2a1 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -20,6 +20,7 @@ class Constant(object): CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" + COMM_MATRIX_JSON = "communication_matrix.json" STEP_TIME_CSV = "step_trace_time.csv" # file authority @@ -60,6 +61,7 @@ class Constant(object): COMMUNICATION_OPS = "communication_ops" COLLECTION_PATH = "collection_path" COMMUNICATION_GROUP = "communication_group" + TRANSPORT_TYPE = "Transport Type" # step time RANK = 'rank' diff --git a/profiler/merge_profiling_timeline/rank_id.py b/profiler/merge_profiling_timeline/rank_id.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391