From c134b6404f2ae329cd331560ec02a5881d5eece5 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 31 Jul 2024 01:26:08 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E3=80=90cluster=5Fanalysis=E3=80=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E7=9A=84=E5=8F=AF=E9=80=89=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/cli/analyze_cli.py | 2 ++ profiler/cli/cluster_cli.py | 6 ++++-- .../cluster_analyse/analysis/base_analysis.py | 10 ++++++---- .../analysis/comm_matrix_analysis.py | 2 +- .../analysis/communication_analysis.py | 2 +- .../analysis/host_info_analysis.py | 2 +- .../analysis/step_trace_time_analysis.py | 5 +++-- profiler/cluster_analyse/cluster_analysis.py | 18 +++++++++++++++--- .../base_communication_group.py | 1 + .../communication_db_group.py | 2 +- .../communication_json_group.py | 2 +- 11 files changed, 36 insertions(+), 16 deletions(-) diff --git a/profiler/cli/analyze_cli.py b/profiler/cli/analyze_cli.py index f400a265b7..227ea15673 100644 --- a/profiler/cli/analyze_cli.py +++ b/profiler/cli/analyze_cli.py @@ -64,6 +64,8 @@ def analyze_cli(**kwargs): help='Directory of profiling data') @click.option('--benchmark_profiling_path', '-bp', 'benchmark_profiling_path', type=click.Path(), help='Directory of benchmark profiling data, used for compare performance') +@click.option('--cluster_analysis_output', '-o', 'cluster_analysis_output', type=click.Path(), + help='Directory of cluster analysis output') @click.option('--cann_version', '-cv', 'cann_version', type=click.Choice(constant.SUPPORTED_CANN_VERSION, case_sensitive=False), default=constant.DEFAULT_CANN_VERSION, diff --git a/profiler/cli/cluster_cli.py b/profiler/cli/cluster_cli.py index 93a4a638f2..4e253d9d1a 100644 --- a/profiler/cli/cluster_cli.py +++ b/profiler/cli/cluster_cli.py @@ -34,7 +34,9 @@ context_settings['ignore_unknown_options'] = True @click.option('--profiling_path', '-d', type=click.Path(), required=True, help='path of the profiling data') @click.option('--mode', '-m', type=click.Choice(ALL_FEATURE_LIST), default='all') +@click.option('--cluster_analysis_output', '-o', 'cluster_analysis_output', type=click.Path(), + help='Directory of cluster analysis output') @click.argument('args', nargs=-1) -def cluster_cli(profiling_path, mode, args) -> None: - required_args = ('-d', profiling_path, '-m', mode) +def cluster_cli(profiling_path, mode, cluster_analysis_output, args) -> None: + required_args = ('-d', profiling_path, '-m', mode, '-o', cluster_analysis_output) cluster_analysis_main(required_args + args) diff --git a/profiler/cluster_analyse/analysis/base_analysis.py b/profiler/cluster_analyse/analysis/base_analysis.py index 7209e9b56f..6c4a844d61 100644 --- a/profiler/cluster_analyse/analysis/base_analysis.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -28,9 +28,10 @@ from cluster_utils.data_transfer_adapter import DataTransferAdapter class BaseAnalysis: - MAX_RANKS = 1000 + MAX_RANKS = 1000000 def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT) self.data_map = param.get(Constant.DATA_MAP) self.data_type = param.get(Constant.DATA_TYPE) self.communication_ops = [] @@ -83,7 +84,7 @@ class BaseAnalysis: output_comm_data = {} for key in self.comm_ops_struct: output_comm_data[str(key)] = self.comm_ops_struct.get(key) - FileManager.create_json_file(self.collection_path, output_comm_data, self.SAVED_JSON) + FileManager.create_json_file(self.cluster_analysis_output, output_comm_data, self.SAVED_JSON) def split_op_by_group(self): for single_op in self.communication_ops: @@ -114,6 +115,7 @@ class BaseRecipeAnalysis: def __init__(self, params): self._params = params self._collection_dir = params.get(Constant.COLLECTION_PATH, "") + self._cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT, "") self._data_map = params.get(Constant.DATA_MAP, {}) self._recipe_name = params.get(Constant.RECIPE_NAME, "") self._mode = params.get(Constant.PARALLEL_MODE, "") @@ -166,7 +168,7 @@ class BaseRecipeAnalysis: return self._recipe_name def dump_data(self, data, file_name, table_name=None, index=True): - output_path = os.path.join(self._collection_dir, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self._cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) if table_name: result_db = os.path.join(output_path, file_name) conn, cursor = DBManager.create_connect_db(result_db) @@ -190,7 +192,7 @@ class BaseRecipeAnalysis: return f"{name}-{i}" def _create_unique_output_dir(self): - output_dir = os.path.join(self._collection_dir, Constant.CLUSTER_ANALYSIS_OUTPUT, self._recipe_name) + output_dir = os.path.join(self._cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT, self._recipe_name) if os.path.exists(output_dir): return self._create_output_dir_name(output_dir) diff --git a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py index 8dc04471fe..cc91eec35e 100644 --- a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py +++ b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py @@ -30,7 +30,7 @@ class CommMatrixAnalysis(BaseAnalysis): def dump_db(self): res_comm_matrix = self.adapter.transfer_matrix_from_json_to_db(self.comm_ops_struct) - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.COMMUNICATION_MATRIX_TABLE) conn, cursor = DBManager.create_connect_db(result_db) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 3f0a9b417e..40af810848 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -30,7 +30,7 @@ class CommunicationAnalysis(BaseAnalysis): def dump_db(self): res_comm_time, res_comm_bandwidth = self.adapter.transfer_comm_from_json_to_db(self.comm_ops_struct) - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.COMMUNICATION_TIME_TABLE, self.COMMUNICATION_BANDWIDTH_TABLE) conn, cursor = DBManager.create_connect_db(result_db) diff --git a/profiler/cluster_analyse/analysis/host_info_analysis.py b/profiler/cluster_analyse/analysis/host_info_analysis.py index 563711080e..98c52085e3 100644 --- a/profiler/cluster_analyse/analysis/host_info_analysis.py +++ b/profiler/cluster_analyse/analysis/host_info_analysis.py @@ -37,7 +37,7 @@ class HostInfoAnalysis(BaseAnalysis): self.dump_db() def dump_db(self): - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) conn, curs = DBManager.create_connect_db(result_db) if not (conn and curs): diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 6a886fffa9..318b0517db 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -27,6 +27,7 @@ class StepTraceTimeAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT) self.data_map = param.get(Constant.DATA_MAP) self.communication_group = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COMMUNICATION_GROUP) self.step_time_dict = {} @@ -56,9 +57,9 @@ class StepTraceTimeAnalysis: return if self.data_type == Constant.TEXT: headers = self.get_headers() - FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + FileManager.create_csv_file(self.cluster_analysis_output, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) else: - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.CLUSTER_TRACE_TIME_TABLE) column_len = DBManager.get_table_column_count(result_db, self.CLUSTER_TRACE_TIME_TABLE) diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index a8d01dcfe3..81f03bf0e0 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -67,6 +67,13 @@ class Interface: self.communication_ops = [] self.matrix_ops = [] self.origin_params = params + self.cluster_analysis_output = self._get_cluster_analysis_output(params) + + def _get_cluster_analysis_output(self, params): + cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT) + if cluster_analysis_output: + return PathManager.get_realpath(cluster_analysis_output) + return self.collection_path def allocate_prof_data(self): ascend_pt_dirs = [] @@ -89,6 +96,7 @@ class Interface: def run(self): PathManager.check_input_directory_path(self.collection_path) PathManager.check_path_owner_consistent(self.collection_path) + PathManager.check_path_writeable(self.cluster_analysis_output) data_map, data_type = self.allocate_prof_data() if not data_map: print("[WARNING] Can not get rank info or profiling data.") @@ -100,9 +108,10 @@ class Interface: if data_type != Constant.DB: print("[ERROR] The current analysis node only supports DB as input data. Please check.") return - FileManager.create_output_dir(self.collection_path, is_overwrite=True) + FileManager.create_output_dir(self.cluster_analysis_output, is_overwrite=True) params = { Constant.COLLECTION_PATH: self.collection_path, + Constant.CLUSTER_ANALYSIS_OUTPUT: self.cluster_analysis_output, Constant.DATA_MAP: data_map, Constant.DATA_TYPE: data_type, Constant.RECIPE_NAME: self.origin_params.get(Constant.RECIPE_NAME, ""), @@ -113,9 +122,10 @@ class Interface: params.update(params[Constant.RECIPE_CLASS].get_extra_argument(self.origin_params)) AnalysisFacade(params).recipe_analyze() else: - FileManager.create_output_dir(self.collection_path) + FileManager.create_output_dir(self.cluster_analysis_output) params = { Constant.COLLECTION_PATH: self.collection_path, + Constant.CLUSTER_ANALYSIS_OUTPUT: self.cluster_analysis_output, Constant.DATA_MAP: data_map, Constant.ANALYSIS_MODE: self.analysis_mode, Constant.DATA_TYPE: data_type @@ -130,10 +140,12 @@ def cluster_analysis_main(args=None): parser.add_argument('-d', '--collection_path', type=str, required=True, help="profiling data path") parser.add_argument('-m', '--mode', choices=ALL_FEATURE_LIST, default='all', help="different analysis mode") + parser.add_argument('-o', '--cluster_analysis_output', type=str, help='Directory of cluster analysis output') args_parsed, args_remained = parser.parse_known_args(args=args) parameter = { Constant.COLLECTION_PATH: args_parsed.collection_path, - Constant.ANALYSIS_MODE: args_parsed.mode + Constant.ANALYSIS_MODE: args_parsed.mode, + Constant.CLUSTER_ANALYSIS_OUTPUT: args_parsed.cluster_analysis_output } if args_parsed.mode in COMM_FEATURE_LIST: if args_remained: diff --git a/profiler/cluster_analyse/communication_group/base_communication_group.py b/profiler/cluster_analyse/communication_group/base_communication_group.py index 55f6801c28..2325d0e8c0 100644 --- a/profiler/cluster_analyse/communication_group/base_communication_group.py +++ b/profiler/cluster_analyse/communication_group/base_communication_group.py @@ -26,6 +26,7 @@ from cluster_utils.data_transfer_adapter import DataTransferAdapter class BaseCommunicationGroup: def __init__(self, params: dict): self.collection_path = params.get(Constant.COLLECTION_PATH) + self.cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT) self.data_map = params.get(Constant.DATA_MAP) self.data_type = params.get(Constant.DATA_TYPE) self.analysis_mode = params.get(Constant.ANALYSIS_MODE) diff --git a/profiler/cluster_analyse/communication_group/communication_db_group.py b/profiler/cluster_analyse/communication_group/communication_db_group.py index 510dcd9713..3180822492 100644 --- a/profiler/cluster_analyse/communication_group/communication_db_group.py +++ b/profiler/cluster_analyse/communication_group/communication_db_group.py @@ -38,7 +38,7 @@ class CommunicationDBGroup(BaseCommunicationGroup): return rank_id, comm_data, comm_matrix_data def dump_data(self): - output_path = os.path.join(self.collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) res = [] for data_type, data_list in self.communication_group.items(): diff --git a/profiler/cluster_analyse/communication_group/communication_json_group.py b/profiler/cluster_analyse/communication_group/communication_json_group.py index f6e01e3abf..4545abd191 100644 --- a/profiler/cluster_analyse/communication_group/communication_json_group.py +++ b/profiler/cluster_analyse/communication_group/communication_json_group.py @@ -27,7 +27,7 @@ class CommunicationJsonGroup(BaseCommunicationGroup): super().__init__(params) def dump_data(self): - FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) + FileManager.create_json_file(self.cluster_analysis_output, self.communication_group, self.COMMUNICATION_GROUP_JSON) def read_communication_func(self: any, params: tuple): if len(params) < 3: -- Gitee From 10af57983feb13f46f4f4c3586f292667d297391 Mon Sep 17 00:00:00 2001 From: fourones Date: Sat, 3 Aug 2024 14:50:08 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E3=80=90cluster=5Fanalysis=E3=80=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E7=9A=84=E5=8F=AF=E9=80=89=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataset/cluster/cluster_dataset.py | 6 +++-- profiler/cli/analyze_cli.py | 4 ++-- profiler/cli/cluster_cli.py | 6 ++--- .../cluster_analyse/analysis/base_analysis.py | 10 ++++---- .../analysis/comm_matrix_analysis.py | 2 +- .../analysis/host_info_analysis.py | 2 +- .../analysis/step_trace_time_analysis.py | 6 ++--- profiler/cluster_analyse/cluster_analysis.py | 24 +++++++++---------- .../cluster_analyse/common_func/constant.py | 3 ++- .../base_communication_group.py | 2 +- .../communication_db_group.py | 2 +- .../communication_json_group.py | 2 +- 12 files changed, 36 insertions(+), 33 deletions(-) diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index e1163f1cdd..67fe0333f4 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -17,13 +17,14 @@ logger = logging.getLogger() class ClusterDataset(Dataset): def __init__(self, collection_path, data: dict, **kwargs) -> None: + self.cluster_analysis_output_path = kwargs.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH, collection_path) super().__init__(collection_path, data) def is_cluster_analysis_output_exist(self): """ check whether input path is valid """ - for file in os.listdir(self.collection_path): + for file in os.listdir(self.cluster_analysis_output_path): if file == 'cluster_analysis_output': logger.info("[INFO]Cluster has been analyzed " "because of the existence of cluster analysis output directory.") @@ -36,7 +37,8 @@ class ClusterDataset(Dataset): return parameter = { Constant.COLLECTION_PATH: self.collection_path, - Constant.ANALYSIS_MODE: "all" + Constant.ANALYSIS_MODE: "all", + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path } print("[INFO] cluster analysis is in the process, please wait...") try: diff --git a/profiler/cli/analyze_cli.py b/profiler/cli/analyze_cli.py index 227ea15673..72c468dcc2 100644 --- a/profiler/cli/analyze_cli.py +++ b/profiler/cli/analyze_cli.py @@ -64,7 +64,7 @@ def analyze_cli(**kwargs): help='Directory of profiling data') @click.option('--benchmark_profiling_path', '-bp', 'benchmark_profiling_path', type=click.Path(), help='Directory of benchmark profiling data, used for compare performance') -@click.option('--cluster_analysis_output', '-o', 'cluster_analysis_output', type=click.Path(), +@click.option('--cluster_analysis_output_path', '-cp', 'cluster_analysis_output_path', type=click.Path(), help='Directory of cluster analysis output') @click.option('--cann_version', '-cv', 'cann_version', type=click.Choice(constant.SUPPORTED_CANN_VERSION, case_sensitive=False), @@ -133,4 +133,4 @@ def analyze_schedule(**kwargs) -> None: help="enter the profiling type, selectable range ascend_pytorch_profiler, mslite ,msprof") @debug_option def analyze_computation(**kwargs) -> None: - _analyze(["computation"], **kwargs) \ No newline at end of file + _analyze(["computation"], **kwargs) diff --git a/profiler/cli/cluster_cli.py b/profiler/cli/cluster_cli.py index 4e253d9d1a..4a6d0db0ae 100644 --- a/profiler/cli/cluster_cli.py +++ b/profiler/cli/cluster_cli.py @@ -34,9 +34,9 @@ context_settings['ignore_unknown_options'] = True @click.option('--profiling_path', '-d', type=click.Path(), required=True, help='path of the profiling data') @click.option('--mode', '-m', type=click.Choice(ALL_FEATURE_LIST), default='all') -@click.option('--cluster_analysis_output', '-o', 'cluster_analysis_output', type=click.Path(), +@click.option('--cluster_analysis_output_path', '-cp', 'cluster_analysis_output_path', type=click.Path(), help='Directory of cluster analysis output') @click.argument('args', nargs=-1) -def cluster_cli(profiling_path, mode, cluster_analysis_output, args) -> None: - required_args = ('-d', profiling_path, '-m', mode, '-o', cluster_analysis_output) +def cluster_cli(profiling_path, mode, cluster_analysis_output_path, args) -> None: + required_args = ('-d', profiling_path, '-m', mode, '-cp', cluster_analysis_output_path) cluster_analysis_main(required_args + args) diff --git a/profiler/cluster_analyse/analysis/base_analysis.py b/profiler/cluster_analyse/analysis/base_analysis.py index 6c4a844d61..7db9732271 100644 --- a/profiler/cluster_analyse/analysis/base_analysis.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -31,7 +31,7 @@ class BaseAnalysis: MAX_RANKS = 1000000 def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) - self.cluster_analysis_output = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT) + self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.data_type = param.get(Constant.DATA_TYPE) self.communication_ops = [] @@ -84,7 +84,7 @@ class BaseAnalysis: output_comm_data = {} for key in self.comm_ops_struct: output_comm_data[str(key)] = self.comm_ops_struct.get(key) - FileManager.create_json_file(self.cluster_analysis_output, output_comm_data, self.SAVED_JSON) + FileManager.create_json_file(self.cluster_analysis_output_path, output_comm_data, self.SAVED_JSON) def split_op_by_group(self): for single_op in self.communication_ops: @@ -115,7 +115,7 @@ class BaseRecipeAnalysis: def __init__(self, params): self._params = params self._collection_dir = params.get(Constant.COLLECTION_PATH, "") - self._cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT, "") + self._cluster_analysis_output_path = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH, "") self._data_map = params.get(Constant.DATA_MAP, {}) self._recipe_name = params.get(Constant.RECIPE_NAME, "") self._mode = params.get(Constant.PARALLEL_MODE, "") @@ -168,7 +168,7 @@ class BaseRecipeAnalysis: return self._recipe_name def dump_data(self, data, file_name, table_name=None, index=True): - output_path = os.path.join(self._cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self._cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if table_name: result_db = os.path.join(output_path, file_name) conn, cursor = DBManager.create_connect_db(result_db) @@ -192,7 +192,7 @@ class BaseRecipeAnalysis: return f"{name}-{i}" def _create_unique_output_dir(self): - output_dir = os.path.join(self._cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT, self._recipe_name) + output_dir = os.path.join(self._cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT, self._recipe_name) if os.path.exists(output_dir): return self._create_output_dir_name(output_dir) diff --git a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py index cc91eec35e..5d674aa52b 100644 --- a/profiler/cluster_analyse/analysis/comm_matrix_analysis.py +++ b/profiler/cluster_analyse/analysis/comm_matrix_analysis.py @@ -30,7 +30,7 @@ class CommMatrixAnalysis(BaseAnalysis): def dump_db(self): res_comm_matrix = self.adapter.transfer_matrix_from_json_to_db(self.comm_ops_struct) - output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.COMMUNICATION_MATRIX_TABLE) conn, cursor = DBManager.create_connect_db(result_db) diff --git a/profiler/cluster_analyse/analysis/host_info_analysis.py b/profiler/cluster_analyse/analysis/host_info_analysis.py index 98c52085e3..6fcbb01227 100644 --- a/profiler/cluster_analyse/analysis/host_info_analysis.py +++ b/profiler/cluster_analyse/analysis/host_info_analysis.py @@ -37,7 +37,7 @@ class HostInfoAnalysis(BaseAnalysis): self.dump_db() def dump_db(self): - output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) conn, curs = DBManager.create_connect_db(result_db) if not (conn and curs): diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py index 318b0517db..80eaf9549f 100644 --- a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -27,7 +27,7 @@ class StepTraceTimeAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) - self.cluster_analysis_output = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT) + self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.communication_group = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COMMUNICATION_GROUP) self.step_time_dict = {} @@ -57,9 +57,9 @@ class StepTraceTimeAnalysis: return if self.data_type == Constant.TEXT: headers = self.get_headers() - FileManager.create_csv_file(self.cluster_analysis_output, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + FileManager.create_csv_file(self.cluster_analysis_output_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) else: - output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) DBManager.create_tables(result_db, self.CLUSTER_TRACE_TIME_TABLE) column_len = DBManager.get_table_column_count(result_db, self.CLUSTER_TRACE_TIME_TABLE) diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index 81f03bf0e0..54a10aabe1 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -67,12 +67,12 @@ class Interface: self.communication_ops = [] self.matrix_ops = [] self.origin_params = params - self.cluster_analysis_output = self._get_cluster_analysis_output(params) + self.cluster_analysis_output_path = self._get_cluster_analysis_output_path(params) - def _get_cluster_analysis_output(self, params): - cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT) - if cluster_analysis_output: - return PathManager.get_realpath(cluster_analysis_output) + def _get_cluster_analysis_output_path(self, params): + cluster_analysis_output_path = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) + if cluster_analysis_output_path: + return PathManager.get_realpath(cluster_analysis_output_path) return self.collection_path def allocate_prof_data(self): @@ -96,7 +96,7 @@ class Interface: def run(self): PathManager.check_input_directory_path(self.collection_path) PathManager.check_path_owner_consistent(self.collection_path) - PathManager.check_path_writeable(self.cluster_analysis_output) + PathManager.check_path_writeable(self.cluster_analysis_output_path) data_map, data_type = self.allocate_prof_data() if not data_map: print("[WARNING] Can not get rank info or profiling data.") @@ -108,10 +108,10 @@ class Interface: if data_type != Constant.DB: print("[ERROR] The current analysis node only supports DB as input data. Please check.") return - FileManager.create_output_dir(self.cluster_analysis_output, is_overwrite=True) + FileManager.create_output_dir(self.cluster_analysis_output_path, is_overwrite=True) params = { Constant.COLLECTION_PATH: self.collection_path, - Constant.CLUSTER_ANALYSIS_OUTPUT: self.cluster_analysis_output, + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path, Constant.DATA_MAP: data_map, Constant.DATA_TYPE: data_type, Constant.RECIPE_NAME: self.origin_params.get(Constant.RECIPE_NAME, ""), @@ -122,10 +122,10 @@ class Interface: params.update(params[Constant.RECIPE_CLASS].get_extra_argument(self.origin_params)) AnalysisFacade(params).recipe_analyze() else: - FileManager.create_output_dir(self.cluster_analysis_output) + FileManager.create_output_dir(self.cluster_analysis_output_path) params = { Constant.COLLECTION_PATH: self.collection_path, - Constant.CLUSTER_ANALYSIS_OUTPUT: self.cluster_analysis_output, + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path, Constant.DATA_MAP: data_map, Constant.ANALYSIS_MODE: self.analysis_mode, Constant.DATA_TYPE: data_type @@ -140,12 +140,12 @@ def cluster_analysis_main(args=None): parser.add_argument('-d', '--collection_path', type=str, required=True, help="profiling data path") parser.add_argument('-m', '--mode', choices=ALL_FEATURE_LIST, default='all', help="different analysis mode") - parser.add_argument('-o', '--cluster_analysis_output', type=str, help='Directory of cluster analysis output') + parser.add_argument('-cp', '--cluster_analysis_output_path', type=str, help='Directory of cluster analysis output') args_parsed, args_remained = parser.parse_known_args(args=args) parameter = { Constant.COLLECTION_PATH: args_parsed.collection_path, Constant.ANALYSIS_MODE: args_parsed.mode, - Constant.CLUSTER_ANALYSIS_OUTPUT: args_parsed.cluster_analysis_output + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: args_parsed.cluster_analysis_output_path } if args_parsed.mode in COMM_FEATURE_LIST: if args_remained: diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 80f0374c1d..aba8e66d9b 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -68,6 +68,7 @@ class Constant(object): COMMUNICATION_OPS = "communication_ops" MATRIX_OPS = "matrix_ops" COLLECTION_PATH = "collection_path" + CLUSTER_ANALYSIS_OUTPUT_PATH = "cluster_analysis_output_path" COMMUNICATION_GROUP = "communication_group" TRANSPORT_TYPE = "Transport Type" COMM_DATA_DICT = "comm_data_dict" @@ -115,4 +116,4 @@ class Constant(object): CLUSTER_CUSTOM_ANALYSE_PATH = os.path.abspath(os.path.dirname(__file__)) ANALYSIS_PATH = os.path.join(CLUSTER_CUSTOM_ANALYSE_PATH, 'analysis') - CONCURRENT_MODE = "concurrent" \ No newline at end of file + CONCURRENT_MODE = "concurrent" diff --git a/profiler/cluster_analyse/communication_group/base_communication_group.py b/profiler/cluster_analyse/communication_group/base_communication_group.py index 2325d0e8c0..91c4af7759 100644 --- a/profiler/cluster_analyse/communication_group/base_communication_group.py +++ b/profiler/cluster_analyse/communication_group/base_communication_group.py @@ -26,7 +26,7 @@ from cluster_utils.data_transfer_adapter import DataTransferAdapter class BaseCommunicationGroup: def __init__(self, params: dict): self.collection_path = params.get(Constant.COLLECTION_PATH) - self.cluster_analysis_output = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT) + self.cluster_analysis_output_path = params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = params.get(Constant.DATA_MAP) self.data_type = params.get(Constant.DATA_TYPE) self.analysis_mode = params.get(Constant.ANALYSIS_MODE) diff --git a/profiler/cluster_analyse/communication_group/communication_db_group.py b/profiler/cluster_analyse/communication_group/communication_db_group.py index 3180822492..f39bf65d9d 100644 --- a/profiler/cluster_analyse/communication_group/communication_db_group.py +++ b/profiler/cluster_analyse/communication_group/communication_db_group.py @@ -38,7 +38,7 @@ class CommunicationDBGroup(BaseCommunicationGroup): return rank_id, comm_data, comm_matrix_data def dump_data(self): - output_path = os.path.join(self.cluster_analysis_output, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) res = [] for data_type, data_list in self.communication_group.items(): diff --git a/profiler/cluster_analyse/communication_group/communication_json_group.py b/profiler/cluster_analyse/communication_group/communication_json_group.py index 4545abd191..b609b5de0e 100644 --- a/profiler/cluster_analyse/communication_group/communication_json_group.py +++ b/profiler/cluster_analyse/communication_group/communication_json_group.py @@ -27,7 +27,7 @@ class CommunicationJsonGroup(BaseCommunicationGroup): super().__init__(params) def dump_data(self): - FileManager.create_json_file(self.cluster_analysis_output, self.communication_group, self.COMMUNICATION_GROUP_JSON) + FileManager.create_json_file(self.cluster_analysis_output_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) def read_communication_func(self: any, params: tuple): if len(params) < 3: -- Gitee From 2be7be6b40e553b4e6e01c42cbad5676fcf4a063 Mon Sep 17 00:00:00 2001 From: fourones Date: Sat, 3 Aug 2024 16:12:47 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E3=80=90cluster=5Fanalysis=E3=80=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E7=9A=84=E5=8F=AF=E9=80=89=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/advisor/dataset/cluster/cluster_dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index 67fe0333f4..0adf8161c7 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -47,7 +47,7 @@ class ClusterDataset(Dataset): raise ValueError(f"Cluster analyze backend failed:{e}") from e def load_csv_data(self, file_name, dataBean): - csv_path = os.path.join(self.collection_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) + csv_path = os.path.join(self.cluster_analysis_output_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) if not os.path.exists(csv_path): msg = "[ERROR] cluster_step_trace_time.csv doesn't exist, terminate analysis." raise RuntimeError(msg) @@ -55,7 +55,7 @@ class ClusterDataset(Dataset): return data def load_json_data(self, file_name): - json_path = os.path.join(self.collection_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) + json_path = os.path.join(self.cluster_analysis_output_path, const.CLUSTER_ANALYSIS_OUTPUT, file_name) if not os.path.exists(json_path): msg = "[ERROR] cluster_communication.json doesn't exist, terminate analysis." raise RuntimeError(msg) @@ -69,7 +69,7 @@ class ClusterStepTraceTimeDataset(ClusterDataset): def __init__(self, collection_path: str, data: dict, **kwargs): self._step_dict = defaultdict() - super().__init__(collection_path, data) + super().__init__(collection_path, data, **kwargs) def _parse(self): self.cluster_analyze() @@ -116,7 +116,7 @@ class ClusterCommunicationDataset(ClusterDataset): self.SDMA_TIME_MS: 0, self.SDMA_SIZE_MB: 0, }) - super().__init__(collection_path, data) + super().__init__(collection_path, data, **kwargs) @staticmethod def compute_ratio(dividend: float, divisor: float): -- Gitee From 98490c628a728742c56507f8a61da9790b6a6a93 Mon Sep 17 00:00:00 2001 From: fourones Date: Sat, 3 Aug 2024 16:36:01 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E3=80=90cluster=5Fanalysis=E3=80=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E7=9A=84=E5=8F=AF=E9=80=89=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/advisor/dataset/cluster/cluster_dataset.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index 0adf8161c7..6d4e643b0a 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -17,7 +17,9 @@ logger = logging.getLogger() class ClusterDataset(Dataset): def __init__(self, collection_path, data: dict, **kwargs) -> None: - self.cluster_analysis_output_path = kwargs.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH, collection_path) + self.cluster_analysis_output_path = kwargs.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) + if not self.cluster_analysis_output_path: + self.cluster_analysis_output_path = collection_path super().__init__(collection_path, data) def is_cluster_analysis_output_exist(self): -- Gitee