diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index e1163f1cdd84265eb5cc5e356753cad5fa663339..6d4e643b0aaddf5d0385a7930f0864582b918f1b 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -17,13 +17,16 @@ logger = logging.getLogger() class ClusterDataset(Dataset): def __init__(self, collection_path, data: dict, **kwargs) -> None: + self.cluster_analysis_output_path = kwargs.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) + if not self.cluster_analysis_output_path: + self.cluster_analysis_output_path = collection_path super().__init__(collection_path, data) def is_cluster_analysis_output_exist(self): """ check whether input path is valid """ - for file in os.listdir(self.collection_path): + for file in os.listdir(self.cluster_analysis_output_path): if file == 'cluster_analysis_output': logger.info("[INFO]Cluster has been analyzed " "because of the existence of cluster analysis output directory.") @@ -36,7 +39,8 @@ class ClusterDataset(Dataset): return parameter = { Constant.COLLECTION_PATH: self.collection_path, - Constant.ANALYSIS_MODE: "all" + Constant.ANALYSIS_MODE: "all", + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path } print("[INFO] cluster analysis is in the process, please wait...") try: @@ -45,7 +49,7 @@ class ClusterDataset(Dataset): raise ValueError(f"Cluster analyze backend failed:{e}") from e def load_csv_data(self, file_name, dataBean): - csv_path = os.path.join(self.collection_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) + csv_path = os.path.join(self.cluster_analysis_output_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) if not os.path.exists(csv_path): msg = "[ERROR] cluster_step_trace_time.csv doesn't exist, terminate analysis." raise RuntimeError(msg) @@ -53,7 +57,7 @@ class ClusterDataset(Dataset): return data def load_json_data(self, file_name): - json_path = os.path.join(self.collection_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) + json_path = os.path.join(self.cluster_analysis_output_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) if not os.path.exists(json_path): msg = "[ERROR] cluster_communication.json doesn't exist, terminate analysis." raise RuntimeError(msg) @@ -67,7 +71,7 @@ class ClusterStepTraceTimeDataset(ClusterDataset): def __init__(self, collection_path: str, data: dict, **kwargs): self._step_dict = defaultdict() - super().__init__(collection_path, data) + super().__init__(collection_path, data, **kwargs) def _parse(self): self.cluster_analyze() @@ -114,7 +118,7 @@ class ClusterCommunicationDataset(ClusterDataset): self.SDMA_TIME_MS: 0, self.SDMA_SIZE_MB: 0, }) - super().__init__(collection_path, data) + super().__init__(collection_path, data, **kwargs) @staticmethod def compute_ratio(dividend: float, divisor: float): diff --git a/profiler/cli/analyze_cli.py b/profiler/cli/analyze_cli.py index f400a265b7bfcab1e5f19513a3eea43fea5250ce..72c468dcc2acb4f118dd6a8841cccfcdc5d85da0 100644 --- a/profiler/cli/analyze_cli.py +++ b/profiler/cli/analyze_cli.py @@ -64,6 +64,8 @@ def analyze_cli(**kwargs): help='Directory of profiling data') @click.option('--benchmark_profiling_path', '-bp', 'benchmark_profiling_path', type=click.Path(), help='Directory of benchmark profiling data, used for compare performance') +@click.option('--cluster_analysis_output_path', '-cp', 'cluster_analysis_output_path', type=click.Path(), + help='Directory of cluster analysis output') @click.option('--cann_version', '-cv', 'cann_version', type=click.Choice(constant.SUPPORTED_CANN_VERSION, case_sensitive=False), default=constant.DEFAULT_CANN_VERSION, @@ -131,4 +133,4 @@ def analyze_schedule(**kwargs) -> None: help="enter the profiling type, selectable range ascend_pytorch_profiler, mslite ,msprof") @debug_option def analyze_computation(**kwargs) -> None: - _analyze(["computation"], **kwargs) \ No newline at end of file + _analyze(["computation"], **kwargs) diff --git a/profiler/cli/cluster_cli.py b/profiler/cli/cluster_cli.py index c1563898d701f0a602e9e02b6f3929320c0d3beb..00957277bab59f675101f500413de6c19973c294 100644 --- a/profiler/cli/cluster_cli.py +++ b/profiler/cli/cluster_cli.py @@ -34,7 +34,9 @@ context_settings['ignore_unknown_options'] = True @click.option('--profiling_path', '-d', type=click.Path(), required=True, help='path of the profiling data') @click.option('--mode', '-m', type=click.Choice(COMM_FEATURE_LIST), default='all') +@click.option('--cluster_analysis_output_path', '-cp', 'cluster_analysis_output_path', type=click.Path(), + help='Directory of cluster analysis output') @click.argument('args', nargs=-1) -def cluster_cli(profiling_path, mode, args) -> None: - required_args = ('-d', profiling_path, '-m', mode) +def cluster_cli(profiling_path, mode, cluster_analysis_output_path, args) -> None: + required_args = ('-d', profiling_path, '-m', mode, '-cp', cluster_analysis_output_path) cluster_analysis_main(required_args + args) diff --git a/profiler/cluster_analyse/analysis/base_analysis.py b/profiler/cluster_analyse/analysis/base_analysis.py index d7be4fc9cf6c9f7963e3f632b6ae0d51e3208fb9..5525ffdaad9d852f7df913ead3ccee2d78230ba6 100644 --- a/profiler/cluster_analyse/analysis/base_analysis.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -26,9 +26,10 @@ from cluster_utils.data_transfer_adapter import DataTransferAdapter class BaseAnalysis: - MAX_RANKS = 1000 + MAX_RANKS = 1000000 def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.data_type = param.get(Constant.DATA_TYPE) self.communication_ops = [] @@ -81,7 +82,7 @@ class BaseAnalysis: output_comm_data = {} for key in self.comm_ops_struct: output_comm_data[str(key)] = self.comm_ops_struct.get(key) - FileManager.create_json_file(self.collection_path, output_comm_data, self.SAVED_JSON) + FileManager.create_json_file(self.cluster_analysis_output_path, output_comm_data, self.SAVED_JSON) def split_op_by_group(self): for single_op in self.communication_ops: diff --git a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py index 8dc04471fe0a164fc859e51597d41028523f7a32..5d674aa52b4e05b456ec67738d3e5af548363438 100644 --- a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py +++ b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py @@ -30,7 +30,7 @@ class CommMatrixAnalysis(BaseAnalysis): def dump_db(self): res_comm_matrix = self.adapter.transfer_matrix_from_json_to_db(self.comm_ops_struct) - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.COMMUNICATION_MATRIX_TABLE) conn, cursor = DBManager.create_connect_db(result_db) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 3f0a9b417e211b124b052cb5c5534f2fdbe5302e..40af810848fe8917a7482b8340ee7e496e3ee0e8 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -30,7 +30,7 @@ class CommunicationAnalysis(BaseAnalysis): def dump_db(self): res_comm_time, res_comm_bandwidth = self.adapter.transfer_comm_from_json_to_db(self.comm_ops_struct) - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.COMMUNICATION_TIME_TABLE, self.COMMUNICATION_BANDWIDTH_TABLE) conn, cursor = DBManager.create_connect_db(result_db) diff --git a/profiler/cluster_analyse/analysis/host_info_analysis.py b/profiler/cluster_analyse/analysis/host_info_analysis.py index 563711080ed3a20923ce73ec595b84892492e9f6..6fcbb01227630690c1acbb94e3d19a2d6650fd8c 100644 --- a/profiler/cluster_analyse/analysis/host_info_analysis.py +++ b/profiler/cluster_analyse/analysis/host_info_analysis.py @@ -37,7 +37,7 @@ class HostInfoAnalysis(BaseAnalysis): self.dump_db() def dump_db(self): - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) conn, curs = DBManager.create_connect_db(result_db) if not (conn and curs): diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 6a886fffa97b142e8267066117f561154d85b162..80eaf9549fe201cefe16084e29abe6c9c806a05c 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -27,6 +27,7 @@ class StepTraceTimeAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.communication_group = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COMMUNICATION_GROUP) self.step_time_dict = {} @@ -56,9 +57,9 @@ class StepTraceTimeAnalysis: return if self.data_type == Constant.TEXT: headers = self.get_headers() - FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + FileManager.create_csv_file(self.cluster_analysis_output_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) else: - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.CLUSTER_TRACE_TIME_TABLE) column_len = DBManager.get_table_column_count(result_db, self.CLUSTER_TRACE_TIME_TABLE) diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index 171417c8879f5f561db6c1a338ed3a0266f28f05..92d243b5ec7254dda62b5667f9af6168b9decefa 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -40,6 +40,13 @@ class Interface: self.communication_ops = [] self.matrix_ops = [] self.origin_params = params + self.cluster_analysis_output_path = self._get_cluster_analysis_output_path(params) + + def _get_cluster_analysis_output_path(self, params): + cluster_analysis_output_path = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) + if cluster_analysis_output_path: + return PathManager.get_realpath(cluster_analysis_output_path) + return self.collection_path def allocate_prof_data(self): ascend_pt_dirs = [] @@ -62,6 +69,7 @@ class Interface: def run(self): PathManager.check_input_directory_path(self.collection_path) PathManager.check_path_owner_consistent(self.collection_path) + PathManager.check_path_writeable(self.cluster_analysis_output_path) data_map, data_type = self.allocate_prof_data() if not data_map: print("[WARNING] Can not get rank info or profiling data.") @@ -69,12 +77,13 @@ class Interface: if data_type == Constant.INVALID: print("[ERROR] The current folder contains both DB and other files. Please check.") return - FileManager.create_output_dir(self.collection_path) + FileManager.create_output_dir(self.cluster_analysis_output_path) params = { Constant.COLLECTION_PATH: self.collection_path, Constant.DATA_MAP: data_map, Constant.ANALYSIS_MODE: self.analysis_mode, - Constant.DATA_TYPE: data_type + Constant.DATA_TYPE: data_type, + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path, } comm_data_dict = CommunicationGroupGenerator(params).generate() params[Constant.COMM_DATA_DICT] = comm_data_dict @@ -86,10 +95,12 @@ def cluster_analysis_main(args=None): parser.add_argument('-d', '--collection_path', type=str, required=True, help="profiling data path") parser.add_argument('-m', '--mode', choices=COMM_FEATURE_LIST, default='all', help="different analysis mode") + parser.add_argument('-cp', '--cluster_analysis_output_path', type=str, help='Directory of cluster analysis output') args_parsed, _ = parser.parse_known_args(args=args) parameter = { Constant.COLLECTION_PATH: args_parsed.collection_path, - Constant.ANALYSIS_MODE: args_parsed.mode + Constant.ANALYSIS_MODE: args_parsed.mode, + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: args_parsed.cluster_analysis_output_path } Interface(parameter).run() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 2922d6a900fbbf243b61a73a13cf9caf945ec1c1..bd845da680286a0035ae24d23afeb547c4c8e603 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -68,6 +68,7 @@ class Constant(object): COMMUNICATION_OPS = "communication_ops" MATRIX_OPS = "matrix_ops" COLLECTION_PATH = "collection_path" + CLUSTER_ANALYSIS_OUTPUT_PATH = "cluster_analysis_output_path" COMMUNICATION_GROUP = "communication_group" TRANSPORT_TYPE = "Transport Type" COMM_DATA_DICT = "comm_data_dict" diff --git a/profiler/cluster_analyse/communication_group/base_communication_group.py b/profiler/cluster_analyse/communication_group/base_communication_group.py index 55f6801c2875698047849d39fbee3b9827c9ad28..91c4af7759c3a59cd1e97be0e8f189964f1639f7 100644 --- a/profiler/cluster_analyse/communication_group/base_communication_group.py +++ b/profiler/cluster_analyse/communication_group/base_communication_group.py @@ -26,6 +26,7 @@ from cluster_utils.data_transfer_adapter import DataTransferAdapter class BaseCommunicationGroup: def __init__(self, params: dict): self.collection_path = params.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output_path = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = params.get(Constant.DATA_MAP) self.data_type = params.get(Constant.DATA_TYPE) self.analysis_mode = params.get(Constant.ANALYSIS_MODE) diff --git a/profiler/cluster_analyse/communication_group/communication_db_group.py b/profiler/cluster_analyse/communication_group/communication_db_group.py index 510dcd971357dfb4798e4d284a72fbb3f3a21859..f39bf65d9dcd5cfc733f96de55b676d3b0cbc4d8 100644 --- a/profiler/cluster_analyse/communication_group/communication_db_group.py +++ b/profiler/cluster_analyse/communication_group/communication_db_group.py @@ -38,7 +38,7 @@ class CommunicationDBGroup(BaseCommunicationGroup): return rank_id, comm_data, comm_matrix_data def dump_data(self): - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) res = [] for data_type, data_list in self.communication_group.items(): diff --git a/profiler/cluster_analyse/communication_group/communication_json_group.py b/profiler/cluster_analyse/communication_group/communication_json_group.py index f6e01e3abfde4d8f180043a5bf9a50c6b5a4964c..b609b5de0e5051f26a8c9e5d30b71c3abd30b96f 100644 --- a/profiler/cluster_analyse/communication_group/communication_json_group.py +++ b/profiler/cluster_analyse/communication_group/communication_json_group.py @@ -27,7 +27,7 @@ class CommunicationJsonGroup(BaseCommunicationGroup): super().__init__(params) def dump_data(self): - FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) + FileManager.create_json_file(self.cluster_analysis_output_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) def read_communication_func(self: any, params: tuple): if len(params) < 3: