diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index 0b870bbaafa6483bf2cfde49971d79106c07aa23..06be6002e1e075645dd21cd1328505829a9b3305 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -14,13 +14,14 @@ # limitations under the License. from multiprocessing import Process -from analysis.communication.comm_analysis_generator import CommunicationAnalysisGenerator -from analysis.communication_matrix.comm_matrix_generator import CommMatrixAnalysisGenerator + +from analysis.communication_analysis import CommunicationAnalysis +from analysis.comm_matrix_analysis import CommMatrixAnalysis from analysis.step_trace_time_analysis import StepTraceTimeAnalysis class AnalysisFacade: - analysis_module = {CommunicationAnalysisGenerator, StepTraceTimeAnalysis, CommMatrixAnalysisGenerator} + analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis, CommMatrixAnalysis} def __init__(self, params: dict): self.params = params diff --git a/profiler/cluster_analyse/analysis/base_analysis_json.py b/profiler/cluster_analyse/analysis/base_analysis.py similarity index 86% rename from profiler/cluster_analyse/analysis/base_analysis_json.py rename to profiler/cluster_analyse/analysis/base_analysis.py index 3df54b0ae2a742f966d8714ea5b850b0999091a7..cc803813dda4a535c529a935c1b42dae197855c9 100644 --- a/profiler/cluster_analyse/analysis/base_analysis_json.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -1,16 +1,19 @@ from abc import abstractmethod from common_func.constant import Constant +from utils.data_transfer_adapter import DataTransferAdapter from common_func.file_manager import FileManager -class BaseAnalysisJson: +class BaseAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) self.data_map = param.get(Constant.DATA_MAP) + self.data_type = param.get(Constant.DATA_TYPE) self.communication_ops = [] self.collective_group_dict = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COLLECTIVE_GROUP) self.comm_ops_struct = {} + self.adapter = DataTransferAdapter() @staticmethod def compute_ratio(dividend: float, divisor: float): @@ -40,6 +43,16 @@ class BaseAnalysisJson: if not self.comm_ops_struct: print("[WARNING] There is no final comm ops data generated") return + if self.data_type == Constant.TEXT: + self.dump_json() + else: + self.dump_db() + + @abstractmethod + def dump_db(self): + pass + + def dump_json(self): output_comm_data = {} for key in self.comm_ops_struct: output_comm_data[str(key)] = self.comm_ops_struct.get(key) diff --git a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_json.py b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py similarity index 79% rename from profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_json.py rename to profiler/cluster_analyse/analysis/comm_matrix_analysis.py index 7baca7e9283e7471b91be99d1b4b8ac828a80fe2..8dc04471fe0a164fc859e51597d41028523f7a32 100644 --- a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_json.py +++ b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py @@ -1,11 +1,14 @@ +import os from collections import defaultdict -from analysis.base_analysis_json import BaseAnalysisJson +from analysis.base_analysis import BaseAnalysis from common_func.constant import Constant +from common_func.db_manager import DBManager -class CommMatrixAnalysisJson(BaseAnalysisJson): +class CommMatrixAnalysis(BaseAnalysis): SAVED_JSON = "cluster_communication_matrix.json" + COMMUNICATION_MATRIX_TABLE = "ClusterCommAnalyzerMatrix" def __init__(self, param: dict): super().__init__(param) @@ -25,6 +28,19 @@ class CommMatrixAnalysisJson(BaseAnalysisJson): self.combine_ops_total_info() self.dump_data() + def dump_db(self): + res_comm_matrix = self.adapter.transfer_matrix_from_json_to_db(self.comm_ops_struct) + output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + DBManager.create_tables(result_db, self.COMMUNICATION_MATRIX_TABLE) + conn, cursor = DBManager.create_connect_db(result_db) + if res_comm_matrix: + res_matrix_value = [list(data.values()) for data in res_comm_matrix] + sql = "insert into {} values ({value})".format(self.COMMUNICATION_MATRIX_TABLE, + value="?," * (len(res_matrix_value[0]) - 1) + "?") + DBManager.executemany_sql(conn, sql, res_matrix_value) + DBManager.destroy_db_connect(conn, cursor) + def compute_total_info(self, step_dict: dict): self.merge_same_links(step_dict) self.combine_link_info(step_dict) diff --git a/profiler/cluster_analyse/analysis/communication/comm_analysis_generator.py b/profiler/cluster_analyse/analysis/communication/comm_analysis_generator.py deleted file mode 100644 index 4b737b5da4f35a0d8363de5869d9e269d85859fa..0000000000000000000000000000000000000000 --- a/profiler/cluster_analyse/analysis/communication/comm_analysis_generator.py +++ /dev/null @@ -1,17 +0,0 @@ -from analysis.communication.communication_analysis_db import CommunicationAnalysisDB -from analysis.communication.communication_analysis_json import CommunicationAnalysisJson -from common_func.constant import Constant - - -class CommunicationAnalysisGenerator: - - GROUP_MAP = { - Constant.DB: CommunicationAnalysisDB, - Constant.TEXT: CommunicationAnalysisJson - } - - def __init__(self, params: dict): - self.generator = self.GROUP_MAP.get(params.get(Constant.DATA_TYPE))(params) - - def run(self): - self.generator.run() diff --git a/profiler/cluster_analyse/analysis/communication/communication_analysis_db.py b/profiler/cluster_analyse/analysis/communication/communication_analysis_db.py deleted file mode 100644 index ff371cf7a8878d3384e4321d0ab8922018c6325c..0000000000000000000000000000000000000000 --- a/profiler/cluster_analyse/analysis/communication/communication_analysis_db.py +++ /dev/null @@ -1,168 +0,0 @@ -import os - -from analysis.base_analysis_json import BaseAnalysisJson -from common_func.db_manager import DBManager -from common_func.constant import Constant -from common_func.table_constant import TableConstant - - -class CommunicationAnalysisDB: - COMMUNICATION_BANDWIDTH_TABLE = "ClusterCommAnalyzerBandwidth" - COMMUNICATION_TIME_TABLE = "ClusterCommAnalyzerTime" - TIME_EXTENSION = "time" - RANK_BAND_TYPE = "{}-{}" - - def __init__(self, params: any): - self.collection_path = params.get(Constant.COLLECTION_PATH) - self.communication_time_info = params.get(Constant.COMM_DATA_DICT, {}).get(Constant.COMMUNICATION_TIME_INFO) - self.communication_bandwidth_info = params.get(Constant.COMM_DATA_DICT, {}).get( - Constant.COMMUNICATION_BANDWIDTH_INFO) - self.collective_group_dict = params.get(Constant.COMM_DATA_DICT, {}).get(Constant.COLLECTIVE_GROUP) - self.comm_time_struct = {} - self.comm_bandwidth_struct = {} - self.res_comm_time = [] - self.res_comm_bandwidth = [] - - def run(self): - if not self.communication_time_info and not self.communication_bandwidth_info: - return - self.split_and_add_rank_set(self.communication_time_info, self.comm_time_struct) - self.split_and_add_rank_set(self.communication_bandwidth_info, self.comm_bandwidth_struct) - self.compute_total_info() - self.dump_data() - - def dump_data(self): - if not self.res_comm_time and not self.res_comm_bandwidth: - print("[WARNING] There is no final communication data generated") - return - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) - result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) - DBManager.create_tables(result_db, self.COMMUNICATION_TIME_TABLE, self.COMMUNICATION_BANDWIDTH_TABLE) - res_time, res_bandwidth = [], [] - conn, cursor = DBManager.create_connect_db(result_db) - for data in self.res_comm_time: - res_time.append([data[TableConstant.RANK_SET], data[TableConstant.STEP], data[TableConstant.RANK_ID], - data[TableConstant.HCCL_OP_NAME], data[TableConstant.GROUP_NAME], - data[TableConstant.START_TIMESTAMP], data[TableConstant.ELAPSED_TIME], - data[TableConstant.TRANSIT_TIME], data[TableConstant.WAIT_TIME], - data[TableConstant.SYNCHRONIZATION_TIME], data[TableConstant.IDLE_TIME], - data[TableConstant.SYNCHRONIZATION_TIME_RATIO], data[TableConstant.WAIT_TIME_RATIO]]) - if res_time: - sql = "insert into {} values ({value})".format(self.COMMUNICATION_TIME_TABLE, - value="?," * (len(res_time[0]) - 1) + "?") - DBManager.executemany_sql(conn, sql, res_time) - for data in self.res_comm_bandwidth: - res_bandwidth.append([data[TableConstant.RANK_SET], data[TableConstant.STEP], data[TableConstant.RANK_ID], - data[TableConstant.HCCL_OP_NAME], data[TableConstant.GROUP_NAME], - data[TableConstant.TRANSPORT_TYPE], data[TableConstant.TRANSIT_SIZE], - data[TableConstant.TRANSIT_TIME], data[TableConstant.BANDWIDTH], - data[TableConstant.LARGE_PACKET_RATIO], data[TableConstant.PACKAGE_SIZE], - data[TableConstant.COUNT], data[TableConstant.TOTAL_DURATION]]) - if res_bandwidth: - sql = "insert into {} values ({value})".format(self.COMMUNICATION_BANDWIDTH_TABLE, - value="?," * (len(res_bandwidth[0]) - 1) + "?") - DBManager.executemany_sql(conn, sql, res_bandwidth) - DBManager.destroy_db_connect(conn, cursor) - - def split_and_add_rank_set(self, data_list, res_dict): - for data in data_list: - if data[TableConstant.TYPE] == Constant.P2P: - rank_tuple = Constant.P2P - else: - rank_tuple = tuple(self.collective_group_dict.get(data[TableConstant.GROUP_NAME], [])) - res_dict.setdefault(rank_tuple, {}).setdefault(data[TableConstant.STEP], []).append(data) - - def compute_total_info(self): - for rank_tuple, op_dict in self.comm_time_struct.items(): - if rank_tuple != Constant.P2P: - for step, data_list in op_dict.items(): - self.compute_rank_set_total_time_info(data_list, rank_tuple) - else: - rank_set = set() - for step, data_list in op_dict.items(): - rank_set.add(data[TableConstant.RANK_ID] for data in data_list) - for step, data_list in op_dict.items(): - self.compute_rank_set_total_time_info(data_list, rank_set, True) - for rank_tuple, op_dict in self.comm_bandwidth_struct.items(): - for step, data_list in op_dict.items(): - if rank_tuple != Constant.P2P: - self.compute_rank_set_total_bandwidth_info(data_list, rank_tuple) - else: - self.compute_rank_set_total_bandwidth_info(data_list, rank_tuple, True) - - def compute_rank_set_total_bandwidth_info(self, data_list, rank_tuple, is_p2p=False): - if not data_list: - return - data_dict = {} - rank_tuple = "(" + ",".join(str(i) for i in rank_tuple) + ")" if not is_p2p else Constant.P2P - for data in data_list: - data[TableConstant.RANK_SET] = rank_tuple - rank_band_type = self.RANK_BAND_TYPE.format(data[TableConstant.RANK_ID], - data[TableConstant.TRANSPORT_TYPE]) - data_dict.setdefault(rank_band_type, []).append(data) - self.res_comm_bandwidth.append(data) - for rank_band_type, bandwidth_list in data_dict.items(): - package_set = set() - for data in bandwidth_list: - package_set.add(data[TableConstant.PACKAGE_SIZE]) - for package in package_set: - total_comm_bandwidth_info = dict() - for data in bandwidth_list: - self.compute_bandwidth(total_comm_bandwidth_info, data, package) - bandwidth = BaseAnalysisJson.compute_ratio(total_comm_bandwidth_info.get(TableConstant.TRANSIT_SIZE), - total_comm_bandwidth_info.get(TableConstant.TRANSIT_TIME)) - total_comm_bandwidth_info[TableConstant.BANDWIDTH] = bandwidth - total_comm_bandwidth_info[TableConstant.PACKAGE_SIZE] = package - total_comm_bandwidth_info[TableConstant.HCCL_OP_NAME] = Constant.TOTAL_OP_INFO - total_comm_bandwidth_info[TableConstant.GROUP_NAME] = "" - total_comm_bandwidth_info[TableConstant.LARGE_PACKET_RATIO] = 0.0 - self.res_comm_bandwidth.append(total_comm_bandwidth_info) - - def compute_bandwidth(self, res_dict, data_dict, package): - for key in data_dict.keys(): - if key in [TableConstant.TRANSIT_TIME, TableConstant.TRANSIT_SIZE]: - if key not in res_dict.keys(): - res_dict[key] = 0.0 - res_dict[key] += data_dict[key] - elif key in [TableConstant.COUNT, TableConstant.TOTAL_DURATION]: - if data_dict[TableConstant.PACKAGE_SIZE] == package: - if key not in res_dict.keys(): - res_dict[key] = 0.0 - res_dict[key] += data_dict[key] - else: - res_dict[key] = data_dict[key] - - def compute_time(self, res_dict, data_dict, dict_key): - if dict_key.endswith(self.TIME_EXTENSION): - if dict_key not in res_dict.keys(): - res_dict[dict_key] = 0.0 - res_dict[dict_key] += data_dict[dict_key] - else: - res_dict[dict_key] = data_dict[dict_key] - - def compute_rank_set_total_time_info(self, data_list: list, rank_tuple: any, is_p2p: bool = False): - if not data_list: - return - rank_set = "(" + ",".join(str(i) for i in rank_tuple) + ")" if not is_p2p else Constant.P2P - for rank_id in rank_tuple: - total_comm_time_info = dict() - for data in data_list: - if data[TableConstant.RANK_ID] == rank_id: - data[TableConstant.RANK_SET] = rank_set - data[TableConstant.SYNCHRONIZATION_TIME_RATIO] = 0.0 - data[TableConstant.WAIT_TIME_RATIO] = 0.0 - for key, value in data.items(): - self.compute_time(total_comm_time_info, data, key) - syn_ratio = BaseAnalysisJson.compute_ratio(total_comm_time_info.get(TableConstant.SYNCHRONIZATION_TIME), - total_comm_time_info.get(TableConstant.SYNCHRONIZATION_TIME) + - total_comm_time_info.get(TableConstant.TRANSIT_TIME)) - wait_time_ratio = BaseAnalysisJson.compute_ratio(total_comm_time_info.get(TableConstant.WAIT_TIME), - total_comm_time_info.get(TableConstant.WAIT_TIME) + - total_comm_time_info.get(TableConstant.TRANSIT_TIME)) - total_comm_time_info[TableConstant.HCCL_OP_NAME] = Constant.TOTAL_OP_INFO - total_comm_time_info[TableConstant.GROUP_NAME] = "" - total_comm_time_info[TableConstant.START_TIMESTAMP] = 0.0 - total_comm_time_info[TableConstant.WAIT_TIME_RATIO] = wait_time_ratio - total_comm_time_info[TableConstant.SYNCHRONIZATION_TIME_RATIO] = syn_ratio - self.res_comm_time.append(total_comm_time_info) - self.res_comm_time.extend(data_list) diff --git a/profiler/cluster_analyse/analysis/communication/communication_analysis_json.py b/profiler/cluster_analyse/analysis/communication_analysis.py similarity index 76% rename from profiler/cluster_analyse/analysis/communication/communication_analysis_json.py rename to profiler/cluster_analyse/analysis/communication_analysis.py index 7fa680fe56ceb93cb6e277843107fb458b1f8807..3f0a9b417e211b124b052cb5c5534f2fdbe5302e 100644 --- a/profiler/cluster_analyse/analysis/communication/communication_analysis_json.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -1,11 +1,15 @@ +import os from collections import defaultdict -from analysis.base_analysis_json import BaseAnalysisJson +from analysis.base_analysis import BaseAnalysis from common_func.constant import Constant +from common_func.db_manager import DBManager -class CommunicationAnalysisJson(BaseAnalysisJson): +class CommunicationAnalysis(BaseAnalysis): SAVED_JSON = "cluster_communication.json" + COMMUNICATION_BANDWIDTH_TABLE = "ClusterCommAnalyzerBandwidth" + COMMUNICATION_TIME_TABLE = "ClusterCommAnalyzerTime" def __init__(self, param: dict): super().__init__(param) @@ -24,6 +28,23 @@ class CommunicationAnalysisJson(BaseAnalysisJson): self.combine_ops_total_info() self.dump_data() + def dump_db(self): + res_comm_time, res_comm_bandwidth = self.adapter.transfer_comm_from_json_to_db(self.comm_ops_struct) + output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + DBManager.create_tables(result_db, self.COMMUNICATION_TIME_TABLE, self.COMMUNICATION_BANDWIDTH_TABLE) + conn, cursor = DBManager.create_connect_db(result_db) + self.execute(conn, res_comm_time, self.COMMUNICATION_TIME_TABLE) + self.execute(conn, res_comm_bandwidth, self.COMMUNICATION_BANDWIDTH_TABLE) + DBManager.destroy_db_connect(conn, cursor) + + @staticmethod + def execute(conn, res_data, table_name): + if res_data: + res_value = [list(data.values()) for data in res_data] + sql = "insert into {} values ({value})".format(table_name, value="?," * (len(res_value[0]) - 1) + "?") + DBManager.executemany_sql(conn, sql, res_value) + def compute_total_info(self, comm_ops: dict): if not comm_ops: return diff --git a/profiler/cluster_analyse/analysis/communication_matrix/__init__.py b/profiler/cluster_analyse/analysis/communication_matrix/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_db.py b/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_db.py deleted file mode 100644 index dbee80debd7ca4b9ea25636ba249fa7a83753ddb..0000000000000000000000000000000000000000 --- a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_analysis_db.py +++ /dev/null @@ -1,136 +0,0 @@ -import os - -from analysis.base_analysis_json import BaseAnalysisJson -from common_func.db_manager import DBManager -from common_func.constant import Constant -from common_func.table_constant import TableConstant - - -class CommMatrixAnalysisDB: - COMMUNICATION_MATRIX_TABLE = "ClusterCommAnalyzerMatrix" - - def __init__(self, params: any): - self.collection_path = params.get(Constant.COLLECTION_PATH) - self.matrix_info = params.get(Constant.COMM_DATA_DICT, {}).get(Constant.MATRIX_OPS) - self.collective_group_dict = params.get(Constant.COMM_DATA_DICT, {}).get(Constant.COLLECTIVE_GROUP) - self.comm_matrix_struct = {} - self.res_comm_matrix = [] - - def run(self): - if not self.matrix_info: - return - self.set_rank_tuple() - self.combine_total_matrix_info() - self.dump_data() - - def dump_data(self): - if not self.res_comm_matrix: - print("[WARNING] There is no final communication_matrix data generated") - return - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) - result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) - DBManager.create_tables(result_db, self.COMMUNICATION_MATRIX_TABLE) - conn, cursor = DBManager.create_connect_db(result_db) - res = [] - for data in self.res_comm_matrix: - op_name = data.get(TableConstant.OPNAME) if data.get(TableConstant.OPNAME) is not None else "" - res.append([data[TableConstant.RANK_SET], data[TableConstant.STEP], data[TableConstant.HCCL_OP_NAME], - data[TableConstant.GROUP_NAME], data[TableConstant.SRC_RANK], data[TableConstant.DST_RANK], - data[TableConstant.TRANSIT_SIZE], data[TableConstant.TRANSIT_TIME], - data[TableConstant.BANDWIDTH], data[TableConstant.TRANSPORT_TYPE], op_name]) - if res: - sql = "insert into {} values ({value})".format(self.COMMUNICATION_MATRIX_TABLE, - value="?," * (len(res[0]) - 1) + "?") - DBManager.executemany_sql(conn, sql, res) - DBManager.destroy_db_connect(conn, cursor) - - def combine_total_matrix_info(self): - for rank_tuple, group_dict in self.comm_matrix_struct.items(): - if rank_tuple != Constant.P2P: - rank_tuple = "(" + ",".join(str(i) for i in rank_tuple) + ")" - for step, step_dict in group_dict.items(): - self.merge_same_info(step_dict, rank_tuple) - self.combine_total_info(step_dict) - - def combine_total_info(self, step_dict: dict): - link_key_set = set() - for op_name, matrix_dict in step_dict.items(): - self.res_comm_matrix.extend(matrix_dict.values()) - if BaseAnalysisJson.check_add_op(op_name): - for key in matrix_dict.keys(): - link_key_set.add(key) - for link_key in link_key_set: - total_matrix_info = dict() - total_matrix_info[TableConstant.TRANSIT_SIZE] = 0.0 - total_matrix_info[TableConstant.TRANSIT_TIME] = 0.0 - for op_name, matrix_dict in step_dict.items(): - if link_key in matrix_dict.keys() and BaseAnalysisJson.check_add_op(op_name): - total_matrix_info[TableConstant.RANK_SET] = matrix_dict[link_key][TableConstant.RANK_SET] - self.combine_link_info(total_matrix_info, matrix_dict[link_key]) - bandwidth = BaseAnalysisJson.compute_ratio(total_matrix_info[TableConstant.TRANSIT_SIZE], - total_matrix_info[TableConstant.TRANSIT_TIME]) - total_matrix_info[TableConstant.HCCL_OP_NAME] = Constant.TOTAL_OP_INFO - total_matrix_info[TableConstant.GROUP_NAME] = "" - total_matrix_info[TableConstant.BANDWIDTH] = bandwidth - self.res_comm_matrix.append(total_matrix_info) - - def combine_link_info(self, link_info, data: dict): - for col in data.keys(): - if col in [TableConstant.TRANSIT_TIME, TableConstant.TRANSIT_SIZE]: - link_info[col] += data[col] - else: - link_info[col] = data[col] - - def merge_same_info(self, step_dict: dict, rank_tuple): - def process_matrix(): - for data in op_list: - if data[TableConstant.SRC_RANK] == data[TableConstant.DST_RANK]: - if data[TableConstant.SRC_RANK] not in local_global_rank_map: - local_global_rank_map[data[TableConstant.SRC_RANK]] = data[TableConstant.RANK_ID] - elif local_global_rank_map[data[TableConstant.SRC_RANK]] != data[TableConstant.RANK_ID]: - print(f"[WARNING] In the same communication group, local ranks projecting to global ranks " - f"repeat!") - if (link_key.split('-')[0] == data[TableConstant.SRC_RANK] and - link_key.split('-')[1] == data[TableConstant.DST_RANK]): - self.combine_link_info(matrix_info, data) - new_matrix_list[link_key] = matrix_info - - def convert_local_to_global_rank(): - res_dict = dict() - for key, new_matrix in new_matrix_list.items(): - src_rank = new_matrix[TableConstant.SRC_RANK] - dst_rank = new_matrix[TableConstant.DST_RANK] - src_rank = local_global_rank_map[src_rank] if src_rank in local_global_rank_map else src_rank - dst_rank = local_global_rank_map[dst_rank] if dst_rank in local_global_rank_map else dst_rank - bandwidth = BaseAnalysisJson.compute_ratio(new_matrix[TableConstant.TRANSIT_SIZE], - new_matrix[TableConstant.TRANSIT_TIME]) - key = f"{src_rank}-{dst_rank}" - new_matrix[TableConstant.SRC_RANK] = src_rank - new_matrix[TableConstant.DST_RANK] = dst_rank - new_matrix[TableConstant.BANDWIDTH] = bandwidth - res_dict[key] = new_matrix - return res_dict - - local_global_rank_map = dict() - for op_name, op_list in step_dict.items(): - new_matrix_list = {} - link_key_set = set() - for op_data in op_list: - link_key_set.add(op_data[TableConstant.SRC_RANK] + "-" + op_data[TableConstant.DST_RANK]) - for link_key in link_key_set: - matrix_info = dict() - matrix_info[TableConstant.RANK_SET] = rank_tuple - matrix_info[TableConstant.TRANSIT_SIZE] = 0.0 - matrix_info[TableConstant.TRANSIT_TIME] = 0.0 - process_matrix() - step_dict[op_name] = convert_local_to_global_rank() - - def set_rank_tuple(self): - for data in self.matrix_info: - op_name = data[TableConstant.HCCL_OP_NAME] + "@" + data[TableConstant.GROUP_NAME] - if data[TableConstant.STEP] == Constant.P2P: - rank_tuple = Constant.P2P - else: - rank_tuple = tuple(self.collective_group_dict.get(data[TableConstant.GROUP_NAME], [])) - self.comm_matrix_struct.setdefault(rank_tuple, {}).setdefault(data[TableConstant.STEP], {}). \ - setdefault(op_name, []).append(data) diff --git a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_generator.py b/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_generator.py deleted file mode 100644 index 03a1826955dd33e8bdd43e91a359f0915f2121c4..0000000000000000000000000000000000000000 --- a/profiler/cluster_analyse/analysis/communication_matrix/comm_matrix_generator.py +++ /dev/null @@ -1,17 +0,0 @@ -from analysis.communication_matrix.comm_matrix_analysis_db import CommMatrixAnalysisDB -from analysis.communication_matrix.comm_matrix_analysis_json import CommMatrixAnalysisJson -from common_func.constant import Constant - - -class CommMatrixAnalysisGenerator: - - GROUP_MAP = { - Constant.DB: CommMatrixAnalysisDB, - Constant.TEXT: CommMatrixAnalysisJson - } - - def __init__(self, params: dict): - self.generator = self.GROUP_MAP.get(params.get(Constant.DATA_TYPE))(params) - - def run(self): - self.generator.run() diff --git a/profiler/cluster_analyse/cluster_data_preprocess/data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/data_preprocessor.py index ebc9647c208b05f51698563b8dabb7d13c28c7ec..72d65ae6571e68564e46f43463843d1f46a3a69e 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/data_preprocessor.py @@ -12,15 +12,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os from abc import abstractmethod class DataPreprocessor: - def __init__(self, collection_path: str): - self.collection_path = collection_path + PROFILER_INFO_HEAD = 'profiler_info_' + PROFILER_INFO_EXTENSION = '.json' + + def __init__(self, path_list: list): + self.path_list = path_list self.data_map = {} @abstractmethod - def input_data(self): + def get_data_map(self): pass + + def get_rank_id(self, dir_name: str) -> int: + files = os.listdir(dir_name) + for file_name in files: + if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): + rank_id_str = file_name[len(self.PROFILER_INFO_HEAD): -1 * len(self.PROFILER_INFO_EXTENSION)] + try: + rank_id = int(rank_id_str) + except ValueError: + rank_id = -1 + return rank_id + return -1 diff --git a/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py index 85debdd31bb07cf96b91c12eb731cc00b00fcaa3..a3e09983ddb54b972a9e343c1661b5c8b2cbb8c8 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py @@ -14,17 +14,14 @@ # limitations under the License. from collections import defaultdict -import os -from common_func.file_manager import FileManager -from common_func.path_manager import PathManager +from cluster_data_preprocess.data_preprocessor import DataPreprocessor -class MindsporeDataPreprocessor: - PROFILER_INFO_HEAD = 'profiler_info_' - PROFILER_INFO_EXTENSION = '.json' - def __init__(self, path_list: str): - self.path_list = path_list +class MindsporeDataPreprocessor(DataPreprocessor): + + def __init__(self, path_list: list): + super().__init__(path_list) def get_data_map(self) -> dict: rank_id_map = defaultdict(list) @@ -35,23 +32,10 @@ class MindsporeDataPreprocessor: continue rank_id_map[rank_id].append(dir_name) - ret_dict = dict() try: for (rank_id, dir_list) in rank_id_map.items(): dir_list.sort(key=lambda x: x.split('_')[-3]) - ret_dict[rank_id] = dir_list[0] + self.data_map[rank_id] = dir_list[0] except Exception as e: raise RuntimeError("Found invalid directory name!") from e - return ret_dict - - def get_rank_id(self, dir_name: str) -> int: - files = os.listdir(dir_name) - for file_name in files: - if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): - rank_id_str = file_name[len(self.PROFILER_INFO_HEAD): -1 * len(self.PROFILER_INFO_EXTENSION)] - try: - rank_id = int(rank_id_str) - except ValueError: - rank_id = -1 - return rank_id - return -1 + return self.data_map diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 7b5561284550f9f2776ccdcbb363cc8f1c7f2fbb..55c3d03958b97c427fe8fde0625e72ea4dee8997 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -16,21 +16,16 @@ import glob from collections import defaultdict import os +from cluster_data_preprocess.data_preprocessor import DataPreprocessor from common_func.constant import Constant from common_func.file_manager import FileManager -from common_func.path_manager import PathManager -class PytorchDataPreprocessor: - PROFILER_INFO_HEAD = 'profiler_info_' - PROFILER_INFO_EXTENSION = '.json' - JSON_RESULT_INFO = "*.json" - CSV_RESULT_INFO = "*.csv" +class PytorchDataPreprocessor(DataPreprocessor): - def __init__(self, path_list: str): - self.path_list = path_list - self.db_count = 0 - self.text_count = 0 + def __init__(self, path_list: list): + super().__init__(path_list) + self.data_type = set() def get_data_map(self) -> dict: rank_id_map = defaultdict(list) @@ -39,49 +34,23 @@ class PytorchDataPreprocessor: if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') continue - folder_path = os.path.join(dir_name, Constant.SINGLE_OUTPUT) - db_files = glob.glob(os.path.join(folder_path, Constant.DB_COMMUNICATION_ANALYZER)) - text_files = (glob.glob(os.path.join(folder_path, self.JSON_RESULT_INFO)) + - glob.glob(os.path.join(folder_path, self.CSV_RESULT_INFO))) - if text_files and db_files: - print(f"[ERROR] Rank {rank_id} has both db and text files") - self.db_count, self.text_count = 1, 1 - break - if db_files: - self.db_count += 1 - elif text_files: - self.text_count += 1 - else: - print(f"[WARNING] Rank {rank_id} has no valid files") - continue + for file_name in os.listdir(dir_name): + if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): + file_path = os.path.join(dir_name, file_name) + config = FileManager.read_json_file(file_path) + self.data_type.add(config.get(Constant.CONFIG, {}).get(Constant.EXPER_CONFIG, {}). + get(Constant.EXPORT_TYPE, Constant.TEXT)) rank_id_map[rank_id].append(dir_name) - ret_dict = dict() try: for (rank_id, dir_list) in rank_id_map.items(): dir_list.sort(key=lambda x: x.split('_')[-3]) - ret_dict[rank_id] = dir_list[0] + self.data_map[rank_id] = dir_list[0] except Exception as e: raise RuntimeError("Found invalid directory name!") from e - return ret_dict - - def get_rank_id(self, dir_name: str) -> int: - files = os.listdir(dir_name) - for file_name in files: - if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): - rank_id_str = file_name[len(self.PROFILER_INFO_HEAD): -1 * len(self.PROFILER_INFO_EXTENSION)] - try: - rank_id = int(rank_id_str) - except ValueError: - rank_id = -1 - return rank_id - return -1 + return self.data_map def get_data_type(self): - if self.db_count != 0 and self.text_count != 0: - return Constant.INVALID - if self.db_count != 0: - return Constant.DB - if self.text_count != 0: - return Constant.TEXT + if len(self.data_type) == 1: + return self.data_type.pop() return Constant.INVALID diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 200244aff4a039ec25dead6b2a9f92248b496f1e..3b4126de792357b6a7a0d4d0d4dbce40067c4651 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -57,6 +57,9 @@ class Constant(object): OP_NAME = "Op Name" BANDWIDTH_GB_S = "Bandwidth(GB/s)" COMMUNICATION = "communication.json" + ELAPSE_TIME_MS = "Elapse Time(ms)" + IDLE_TIME_MS = "Idle Time(ms)" + LARGE_PACKET_RATIO = "Large Packet Ratio" # params DATA_MAP = "data_map" @@ -95,3 +98,8 @@ class Constant(object): TABLE_COMM_ANALYZER_TIME = "CommAnalyzerTime" TABLE_COMM_ANALYZER_MATRIX = "CommAnalyzerMatrix" TABLE_STEP_TRACE = "StepTraceTime" + + # data config key + CONFIG = "config" + EXPER_CONFIG = "experimental_config" + EXPORT_TYPE = "_export_type" diff --git a/profiler/cluster_analyse/common_func/db_manager.py b/profiler/cluster_analyse/common_func/db_manager.py index bdee49be60640362d54a697ca202e9d54a58d4f8..039473d70744412bb9e3cf89ef1b3db53d8b817e 100644 --- a/profiler/cluster_analyse/common_func/db_manager.py +++ b/profiler/cluster_analyse/common_func/db_manager.py @@ -117,11 +117,13 @@ class DBManager: def create_tables(cls, db_path: any, *tables: any): conn, curs = cls.create_connect_db(db_path) for table_name in tables: - if not cls.judge_table_exists(curs, table_name): - table_map = "{0}Map".format(table_name) - header_with_type = cls.sql_generate_table(table_map) - sql = "CREATE TABLE IF NOT EXISTS " + table_name + header_with_type - cls.execute_sql(conn, sql) + if cls.judge_table_exists(curs, table_name): + drop_sql = "drop table {0}".format(table_name) + cls.execute_sql(conn, drop_sql) + table_map = "{0}Map".format(table_name) + header_with_type = cls.sql_generate_table(table_map) + sql = "CREATE TABLE IF NOT EXISTS " + table_name + header_with_type + cls.execute_sql(conn, sql) @staticmethod def execute_sql(conn: any, sql: str, params: any = None) -> bool: diff --git a/profiler/cluster_analyse/communication_group/base_communication_group.py b/profiler/cluster_analyse/communication_group/base_communication_group.py index a275fefe75d0003bd68793269cbba74f31806bd1..923d479ee736edabc4c9e7a137664f3426b593e8 100644 --- a/profiler/cluster_analyse/communication_group/base_communication_group.py +++ b/profiler/cluster_analyse/communication_group/base_communication_group.py @@ -20,6 +20,7 @@ from copy import deepcopy from multiprocessing import Pool from common_func.constant import Constant +from utils.data_transfer_adapter import DataTransferAdapter class BaseCommunicationGroup: @@ -33,6 +34,9 @@ class BaseCommunicationGroup: self.collective_group_dict = defaultdict(set) self.p2p_comm_group = [] self.communication_group = {} + self.communication_ops = [] + self.matrix_ops = [] + self.adapter = DataTransferAdapter() def load_communication_data(self): comm_op_dirs = [] @@ -102,21 +106,106 @@ class BaseCommunicationGroup: def read_communication_func(self, params: tuple): pass - @abstractmethod def analyze_communication_data(self): - pass + for rank_id, rank_id_comm_dict, rank_id_matrix_dict in self.rank_comm_dir_dict: + for step_id, step_id_dict in rank_id_comm_dict.items(): + if not isinstance(step_id_dict, dict): + print(f"[WARNING] rank{rank_id}'s communication.json has a wrong data struct.") + continue + self.get_collective_ops_name(rank_id, step_id_dict.get(Constant.COLLECTIVE)) + for comm_op_type, comm_op_dict in step_id_dict.items(): + self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) + + for step_id, step_id_dict in rank_id_matrix_dict.items(): + if not isinstance(step_id_dict, dict): + print(f"[WARNING] rank{rank_id}'s communication_matrix.json has a wrong data struct.") + continue + self.set_p2p_link(rank_id, step_id, rank_id_matrix_dict) + self.get_collective_ops_name(rank_id, step_id_dict.get(Constant.COLLECTIVE)) @abstractmethod def dump_data(self): pass + def collect_comm_data(self): + comm_data_dict = { + Constant.COLLECTIVE_GROUP: self.collective_group_dict, + Constant.COMMUNICATION_OPS: self.communication_ops, + Constant.MATRIX_OPS: self.matrix_ops, + Constant.COMMUNICATION_GROUP: self.communication_group + } + return comm_data_dict + def generate(self): self.load_communication_data() self.analyze_communication_data() self.set_p2p_groups() self.generate_collective_communication_group() self.generate_p2p_communication_group() - return self.dump_data() + self.dump_data() + return self.collect_comm_data() + + def set_p2p_link(self, rank_id: int, step_id: str, rank_id_matrix_dict: dict): + ops = rank_id_matrix_dict.get(step_id, {}) + self.add_matrix_ops(rank_id, step_id, ops) + if not ops: + print(f"[WARNING] rank{rank_id} {step_id} do not have communication matrix ops data.") + return + p2p_ops = ops.get(Constant.P2P, {}) + for op_name, link_dict in p2p_ops.items(): + self.append_p2p_link(op_name, link_dict) + + def append_p2p_link(self, op_name, link_dict): + for link in link_dict: + if '-' not in link: + print(f"[WARNING] {op_name} has an invalid link key {link}!") + break + src_rank = int(link.split('-')[0]) + dst_rank = int(link.split('-')[1]) + if src_rank != dst_rank: + rank_set = {src_rank, dst_rank} + if rank_set in self.p2p_link: + continue + self.p2p_link.append(rank_set) + + def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.collective_group_dict[group_name].add(rank_id) + + def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: comm_op, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: comm_op_dict.get(comm_op) + }) + + def add_matrix_ops(self, rank_id: int, step_id: str, step_id_dict: dict): + for comm_op_type, comm_dict in step_id_dict.items(): + if comm_op_type != Constant.COLLECTIVE and comm_op_type != Constant.P2P: + print(f"[WARNING] Unknown communication operators type!") + continue + for op_name, op_link_info in comm_dict.items(): + if op_name.startswith('Total'): + continue + group_name = op_name.split('@')[-1] + self.matrix_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: op_name, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: op_link_info + }) class UnionFind(object): diff --git a/profiler/cluster_analyse/communication_group/communication_db_group.py b/profiler/cluster_analyse/communication_group/communication_db_group.py index c61411edab2f7ba9a619680d9803e1f5302c3966..510dcd971357dfb4798e4d284a72fbb3f3a21859 100644 --- a/profiler/cluster_analyse/communication_group/communication_db_group.py +++ b/profiler/cluster_analyse/communication_group/communication_db_group.py @@ -2,7 +2,6 @@ import os from common_func.db_manager import DBManager from common_func.constant import Constant -from common_func.table_constant import TableConstant from communication_group.base_communication_group import BaseCommunicationGroup @@ -11,18 +10,15 @@ class CommunicationDBGroup(BaseCommunicationGroup): def __init__(self, params: dict): super().__init__(params) - self.communication_bandwidth_info = [] - self.communication_time_info = [] - self.matrix_info = [] def read_communication_func(self, params: tuple): if len(params) < 3: return -1, ({}, {}, {}) rank_id = params[0] db_path = params[1] - time_data = {} - bandwidth_data = {} - matrix_data = {} + time_data = [] + bandwidth_data = [] + matrix_data = [] if os.path.exists(db_path): conn, cursor = DBManager.create_connect_db(db_path) time_info_sql = "select * from {0}".format(Constant.TABLE_COMM_ANALYZER_TIME) @@ -37,15 +33,9 @@ class CommunicationDBGroup(BaseCommunicationGroup): and self.analysis_mode in ["all", "communication_matrix"]): matrix_data = DBManager.fetch_all_data(cursor, matrix_info_sql) DBManager.destroy_db_connect(conn, cursor) - return rank_id, (self.data_group_by_step(time_data), self.data_group_by_step(bandwidth_data), - self.data_group_by_step(matrix_data)) - - @staticmethod - def data_group_by_step(data: any) -> any: - res = {} - for item in data: - res.setdefault(item[TableConstant.STEP], []).append(item) - return res + comm_data = self.adapter.transfer_comm_from_db_to_json(time_data, bandwidth_data) + comm_matrix_data = self.adapter.transfer_matrix_from_db_to_json(matrix_data) + return rank_id, comm_data, comm_matrix_data def dump_data(self): output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) @@ -65,46 +55,3 @@ class CommunicationDBGroup(BaseCommunicationGroup): DBManager.destroy_db_connect(conn, cursor) else: print("[WARNING] The CommunicationGroup table won't be created because no data has been calculated.") - comm_data_dict = { - Constant.COLLECTIVE_GROUP: self.collective_group_dict, - Constant.COMMUNICATION_TIME_INFO: self.communication_time_info, - Constant.COMMUNICATION_BANDWIDTH_INFO: self.communication_bandwidth_info, - Constant.MATRIX_OPS: self.matrix_info, - Constant.COMMUNICATION_GROUP: self.communication_group - } - return comm_data_dict - - def analyze_communication_data(self): - for rank_id, data_tuple in self.rank_comm_dir_dict: - time_data, bandwidth_data, matrix_data = data_tuple[0], data_tuple[1], data_tuple[2] - for step, data_list in time_data.items(): - for data in data_list: - self.compute_collective_group(data, rank_id, self.communication_time_info) - for step, data_list in bandwidth_data.items(): - for data in data_list: - self.compute_collective_group(data, rank_id, self.communication_bandwidth_info) - for step, data_list in matrix_data.items(): - self.add_p2p_and_rank(rank_id, step, matrix_data) - for data in data_list: - self.compute_collective_group(data, rank_id, self.matrix_info) - - def compute_collective_group(self, data, rank_id, res_list): - if data[TableConstant.TYPE] == Constant.COLLECTIVE: - self.collective_group_dict[data[TableConstant.GROUP_NAME]].add(rank_id) - data[TableConstant.RANK_ID] = rank_id - res_list.append(data) - - def add_p2p_and_rank(self, rank_id: int, step: str, data_dict: dict): - data_list = data_dict[step] - if not data_list: - print(f"[WARNING] rank {rank_id} {step} don't have communication matrix ops data") - return - for data in data_list: - if data[TableConstant.TYPE] != Constant.COLLECTIVE and data[TableConstant.TYPE] != Constant.P2P: - print(f"[WARNING] Unknown communication operators type!") - continue - if data[TableConstant.TYPE] == Constant.P2P: - if data[TableConstant.SRC_RANK] != data[TableConstant.DST_RANK]: - rank_set = {data[TableConstant.SRC_RANK], data[TableConstant.DST_RANK]} - if rank_set not in self.p2p_link: - self.p2p_link.append(rank_set) diff --git a/profiler/cluster_analyse/communication_group/communication_json_group.py b/profiler/cluster_analyse/communication_group/communication_json_group.py index da6e6c1fe4f699af49ad198df41afb80e34e8772..f6e01e3abfde4d8f180043a5bf9a50c6b5a4964c 100644 --- a/profiler/cluster_analyse/communication_group/communication_json_group.py +++ b/profiler/cluster_analyse/communication_group/communication_json_group.py @@ -25,35 +25,9 @@ class CommunicationJsonGroup(BaseCommunicationGroup): def __init__(self, params: dict): super().__init__(params) - self.communication_ops = [] - self.matrix_ops = [] def dump_data(self): FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) - comm_data_dict = { - Constant.COLLECTIVE_GROUP: self.collective_group_dict, - Constant.COMMUNICATION_OPS: self.communication_ops, - Constant.MATRIX_OPS: self.matrix_ops, - Constant.COMMUNICATION_GROUP: self.communication_group - } - return comm_data_dict - - def analyze_communication_data(self): - for rank_id, rank_id_comm_dict, rank_id_matrix_dict in self.rank_comm_dir_dict: - for step_id, step_id_dict in rank_id_comm_dict.items(): - if not isinstance(step_id_dict, dict): - print(f"[WARNING] rank{rank_id}'s communication.json has a wrong data struct.") - continue - self.get_collective_ops_name(rank_id, step_id_dict.get(Constant.COLLECTIVE)) - for comm_op_type, comm_op_dict in step_id_dict.items(): - self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) - - for step_id, step_id_dict in rank_id_matrix_dict.items(): - if not isinstance(step_id_dict, dict): - print(f"[WARNING] rank{rank_id}'s communication_matrix.json has a wrong data struct.") - continue - self.set_p2p_link(rank_id, step_id, rank_id_matrix_dict) - self.get_collective_ops_name(rank_id, step_id_dict.get(Constant.COLLECTIVE)) def read_communication_func(self: any, params: tuple): if len(params) < 3: @@ -68,65 +42,3 @@ class CommunicationJsonGroup(BaseCommunicationGroup): if os.path.exists(matrix_json_path) and self.analysis_mode in ["all", "communication_matrix"]: matrix_data = FileManager.read_json_file(matrix_json_path) return rank_id, comm_data, matrix_data - - def set_p2p_link(self, rank_id: int, step_id: str, rank_id_matrix_dict: dict): - ops = rank_id_matrix_dict.get(step_id, {}) - self.add_matrix_ops(rank_id, step_id, ops) - if not ops: - print(f"[WARNING] rank{rank_id} {step_id} do not have communication matrix ops data.") - return - p2p_ops = ops.get(Constant.P2P, {}) - for op_name, link_dict in p2p_ops.items(): - self.append_p2p_link(op_name, link_dict) - - def append_p2p_link(self, op_name, link_dict): - for link in link_dict: - if '-' not in link: - print(f"[WARNING] {op_name} has an invalid link key {link}!") - break - src_rank = int(link.split('-')[0]) - dst_rank = int(link.split('-')[1]) - if src_rank != dst_rank: - rank_set = set([src_rank, dst_rank]) - if rank_set in self.p2p_link: - continue - self.p2p_link.append(rank_set) - - def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): - for comm_op in comm_op_dict: - if comm_op.startswith('Total'): - continue - group_name = comm_op.split('@')[-1] - self.collective_group_dict[group_name].add(rank_id) - - def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): - for comm_op in comm_op_dict: - if comm_op.startswith('Total'): - continue - group_name = comm_op.split('@')[-1] - self.communication_ops.append({ - Constant.RANK_ID: rank_id, - Constant.STEP_ID: step_id, - Constant.COMM_OP_TYPE: comm_op_type, - Constant.COMM_OP_NAME: comm_op, - Constant.GROUP_NAME: group_name, - Constant.COMM_OP_INFO: comm_op_dict.get(comm_op) - }) - - def add_matrix_ops(self, rank_id: int, step_id: str, step_id_dict: dict): - for comm_op_type, comm_dict in step_id_dict.items(): - if comm_op_type != Constant.COLLECTIVE and comm_op_type != Constant.P2P: - print(f"[WARNING] Unknown communication operators type!") - continue - for op_name, op_link_info in comm_dict.items(): - if op_name.startswith('Total'): - continue - group_name = op_name.split('@')[-1] - self.matrix_ops.append({ - Constant.RANK_ID: rank_id, - Constant.STEP_ID: step_id, - Constant.COMM_OP_TYPE: comm_op_type, - Constant.COMM_OP_NAME: op_name, - Constant.GROUP_NAME: group_name, - Constant.COMM_OP_INFO: op_link_info - }) diff --git a/profiler/cluster_analyse/analysis/communication/__init__.py b/profiler/cluster_analyse/utils/__init__.py similarity index 100% rename from profiler/cluster_analyse/analysis/communication/__init__.py rename to profiler/cluster_analyse/utils/__init__.py diff --git a/profiler/cluster_analyse/utils/data_transfer_adapter.py b/profiler/cluster_analyse/utils/data_transfer_adapter.py new file mode 100644 index 0000000000000000000000000000000000000000..1f306415fa789ae0dab7d8751b1c240b3433de0d --- /dev/null +++ b/profiler/cluster_analyse/utils/data_transfer_adapter.py @@ -0,0 +1,142 @@ +import copy + +from common_func.constant import Constant +from common_func.table_constant import TableConstant + + +class DataTransferAdapter(object): + COMM_TIME_TABLE_COLUMN = [TableConstant.START_TIMESTAMP, TableConstant.ELAPSED_TIME, TableConstant.TRANSIT_TIME, + TableConstant.WAIT_TIME, TableConstant.SYNCHRONIZATION_TIME, TableConstant.IDLE_TIME, + TableConstant.SYNCHRONIZATION_TIME_RATIO, TableConstant.WAIT_TIME_RATIO] + COMM_TIME_JSON_COLUMN = [Constant.START_TIMESTAMP, Constant.ELAPSE_TIME_MS, Constant.TRANSIT_TIME_MS, + Constant.WAIT_TIME_MS, Constant.SYNCHRONIZATION_TIME_MS, Constant.IDLE_TIME_MS, + Constant.SYNCHRONIZATION_TIME_RATIO, Constant.WAIT_TIME_RATIO] + MATRIX_TABLE_COLUMN = [TableConstant.TRANSIT_SIZE, TableConstant.TRANSIT_TIME, TableConstant.BANDWIDTH, + TableConstant.TRANSPORT_TYPE, TableConstant.OPNAME] + MATRIX_JSON_COLUMN = [Constant.TRANSIT_SIZE_MB, Constant.TRANSIT_TIME_MS, Constant.BANDWIDTH_GB_S, + Constant.TRANSPORT_TYPE, Constant.OP_NAME] + COMM_BD_TABLE_COLUMN = [TableConstant.TRANSIT_SIZE, TableConstant.TRANSIT_TIME, TableConstant.BANDWIDTH, + TableConstant.LARGE_PACKET_RATIO] + COMM_BD_JSON_COLUMN = [Constant.TRANSIT_SIZE_MB, Constant.TRANSIT_TIME_MS, Constant.BANDWIDTH_GB_S, + Constant.LARGE_PACKET_RATIO] + + def __init__(self): + super().__init__() + + def transfer_comm_from_db_to_json(self, time_info: list, bandwidth_info: list): + result = {} + if not time_info and not bandwidth_info: + return result + for time_data in time_info: + comm_time = dict() + hccl_name = time_data[TableConstant.HCCL_OP_NAME] + "@" + time_data[TableConstant.GROUP_NAME] + for key, value in dict(zip(self.COMM_TIME_JSON_COLUMN, self.COMM_TIME_TABLE_COLUMN)).items(): + if not key.endswith("ratio"): + comm_time[key] = time_data.get(value, 0) + result.setdefault(time_data[TableConstant.STEP], {}).setdefault(time_data[TableConstant.TYPE], {}). \ + setdefault(hccl_name, {})[Constant.COMMUNICATION_TIME_INFO] = comm_time + hccl_set = set() + for bd_data in bandwidth_info: + hccl_name = bd_data[TableConstant.HCCL_OP_NAME] + "@" + bd_data[TableConstant.GROUP_NAME] + hccl_set.add(hccl_name) + for hccl in hccl_set: + comm_bd = dict() + for bd_data in bandwidth_info: + if hccl == (bd_data[TableConstant.HCCL_OP_NAME] + "@" + bd_data[TableConstant.GROUP_NAME]): + temp_dict = dict() + key_dict = dict(zip(self.COMM_BD_JSON_COLUMN, self.COMM_BD_TABLE_COLUMN)) + self.set_value_by_key(temp_dict, bd_data, key_dict) + comm_bd.setdefault(bd_data[TableConstant.TRANSPORT_TYPE], temp_dict).setdefault( + Constant.SIZE_DISTRIBUTION, {})[bd_data[TableConstant.PACKAGE_SIZE]] = \ + [bd_data[TableConstant.COUNT], bd_data[TableConstant.TOTAL_DURATION]] + result.setdefault(bd_data[TableConstant.STEP], {}).setdefault(bd_data[TableConstant.TYPE], {}). \ + setdefault(hccl, {})[Constant.COMMUNICATION_BANDWIDTH_INFO] = comm_bd + return result + + def transfer_comm_from_json_to_db(self, res_data: dict): + res_comm_data, res_bd_data = list(), list() + + def split_comm_time(): + for rank_id, comm_data in op_data.items(): + time_data = comm_data.get(Constant.COMMUNICATION_TIME_INFO) + res_time = set_only_value(rank_id) + for key, value in dict(zip(self.COMM_TIME_TABLE_COLUMN, self.COMM_TIME_JSON_COLUMN)).items(): + res_time[key] = time_data.get(value, 0) + res_comm_data.append(res_time) + bd_data = comm_data.get(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) + for transport_type, data in bd_data.items(): + res_bandwidth = set_only_value(rank_id) + key_dict = dict(zip(self.COMM_BD_TABLE_COLUMN, self.COMM_BD_JSON_COLUMN)) + res_bandwidth[TableConstant.TRANSPORT_TYPE] = transport_type + self.set_value_by_key(res_bandwidth, data, key_dict) + for key, value in data.get(Constant.SIZE_DISTRIBUTION, {}).items(): + res_bandwidth[TableConstant.PACKAGE_SIZE] = key + res_bandwidth[TableConstant.COUNT] = value[0] + res_bandwidth[TableConstant.TOTAL_DURATION] = value[1] + temp_dict = copy.deepcopy(res_bandwidth) + res_bd_data.append(temp_dict) + + def set_only_value(rank_id): + res_dict = dict() + res_dict[TableConstant.RANK_SET] = str(rank_set) + res_dict[TableConstant.STEP] = step + res_dict[TableConstant.RANK_ID] = rank_id + res_dict[TableConstant.HCCL_OP_NAME] = op_name.split("@")[0] if "@" in op_name else op_name + res_dict[TableConstant.GROUP_NAME] = op_name.split("@")[1] if "@" in op_name else "" + return res_dict + + for rank_set, step_dict in res_data.items(): + for step, op_dict in step_dict.items(): + for op_name, op_data in op_dict.items(): + split_comm_time() + return res_comm_data, res_bd_data + + def set_value_by_key(self, src_dict, dst_dict, key_dict): + for key, value in key_dict.items(): + src_dict[key] = dst_dict.get(value, 0) + + def transfer_matrix_from_db_to_json(self, matrix_data: list): + result = {} + if not matrix_data: + return result + hccl_set = set() + for data in matrix_data: + hccl = data[TableConstant.HCCL_OP_NAME] + "@" + data[TableConstant.GROUP_NAME] + hccl_set.add(hccl) + for hccl in hccl_set: + for data in matrix_data: + if hccl == (data[TableConstant.HCCL_OP_NAME] + "@" + data[TableConstant.GROUP_NAME]): + key = data[TableConstant.SRC_RANK] + '-' + data[TableConstant.DST_RANK] + temp_dict = dict() + key_dict = dict(zip(self.MATRIX_JSON_COLUMN, self.MATRIX_TABLE_COLUMN)) + self.set_value_by_key(temp_dict, data, key_dict) + result.setdefault(data[TableConstant.STEP], {}).setdefault(data[TableConstant.TYPE], {}). \ + setdefault(hccl, {}).setdefault(key, temp_dict) + return result + + def transfer_matrix_from_json_to_db(self, res_data: dict): + result = list() + + def split_matrix_data(): + for op_name, op_data in op_dict.items(): + for link_key, link_data in op_data.items(): + if "@" in op_name: + hccl_op_name, group_name = op_name.split("@")[0], op_name.split("@")[1] + else: + hccl_op_name, group_name = op_name, "" + matrix_data = { + TableConstant.RANK_SET: str(rank_set), + TableConstant.STEP: step, + TableConstant.HCCL_OP_NAME: hccl_op_name, + TableConstant.GROUP_NAME: group_name, + TableConstant.SRC_RANK: link_key.split("-")[0], + TableConstant.DST_RANK: link_key.split("-")[1] + } + key_dict = dict(zip(self.MATRIX_TABLE_COLUMN, self.MATRIX_JSON_COLUMN)) + self.set_value_by_key(matrix_data, link_data, key_dict) + result.append(matrix_data) + + for rank_set, step_dict in res_data.items(): + for step, op_dict in step_dict.items(): + split_matrix_data() + return result