diff --git a/profiler/msprof_analyze/cluster_analyse/analysis/msprof_step_trace_time_adapter.py b/profiler/msprof_analyze/cluster_analyse/analysis/msprof_step_trace_time_adapter.py index 799fd86a477b5ea3e0b59e1831fdc5a7ff398a6b..6fd09389935e5c759ab2ed43d428b952953b0c35 100644 --- a/profiler/msprof_analyze/cluster_analyse/analysis/msprof_step_trace_time_adapter.py +++ b/profiler/msprof_analyze/cluster_analyse/analysis/msprof_step_trace_time_adapter.py @@ -12,8 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +import re -from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeBean from msprof_analyze.prof_common.utils import convert_to_float from msprof_analyze.prof_common.file_manager import FileManager from msprof_analyze.cluster_analyse.common_func.time_range_calculator import RangeCaculator @@ -44,6 +45,26 @@ class MsprofStepTraceTimeAdapter: self.COMMUNICATION: 0, self.FREE: 0, self.STAGE: 0, self.BUBBLE: 0, self.COMM_NOT_OVERLAP_EXCLUDE_RECEIVE: 0, self.PREPARE: 0} + @staticmethod + def find_msprof_json(path): + msprof_pattern = r'^msprof_\d{14}\.json$' + msprof_slice_pattern = r'^msprof_slice_\d{1}_\d{14}\.json$' + msprof_dict, msprof_slice_dict = {}, {} + for file_name in os.listdir(path): + if re.match(msprof_pattern, file_name): + timestamp = re.search(r"\d{14}", file_name).group() + msprof_dict.setdefault(timestamp, []).append(os.path.join(path, file_name)) + elif re.match(msprof_slice_pattern, file_name): + timestamp = re.search(r"\d{14}", file_name).group() + msprof_slice_dict.setdefault(timestamp, []).append(os.path.join(path, file_name)) + if msprof_dict: + max_timestamp = max(msprof_dict.keys()) + return msprof_dict.get(max_timestamp) + if msprof_slice_dict: + max_timestamp = max(msprof_slice_dict.keys()) + return msprof_slice_dict.get(max_timestamp) + return [] + def generate_step_trace_time_data(self): json_str = [] for file_path in self.file_path: @@ -63,7 +84,7 @@ class MsprofStepTraceTimeAdapter: self._data[self.OVERLAPPED] = self._data[self.COMMUNICATION] - self._data[self.COMM_NOT_OVERLAP] e2e_time = self._data[self.FREE] + self._data[self.COMPUTE] + self._data[self.COMM_NOT_OVERLAP] self._data[self.STAGE] = e2e_time - self._data[self.BUBBLE] - return [StepTraceTimeBean(self._data)] + return self._data class MsprofStepTraceTimeDBAdapter(MsprofStepTraceTimeAdapter): @@ -100,12 +121,10 @@ class MsprofStepTraceTimeDBAdapter(MsprofStepTraceTimeAdapter): self._data[self.OVERLAPPED] = self._data[self.COMMUNICATION] - self._data[self.COMM_NOT_OVERLAP] e2e_time = self._data[self.FREE] + self._data[self.COMPUTE] + self._data[self.COMM_NOT_OVERLAP] self._data[self.STAGE] = e2e_time - self._data[self.BUBBLE] - return [[self._data[self.STEP], self._data[self.COMPUTE] / Constant.NS_TO_US, - self._data[self.COMM_NOT_OVERLAP] / Constant.NS_TO_US, self._data[self.OVERLAPPED] / Constant.NS_TO_US, - self._data[self.COMMUNICATION] / Constant.NS_TO_US, self._data[self.FREE] / Constant.NS_TO_US, - self._data[self.STAGE] / Constant.NS_TO_US, self._data[self.BUBBLE] / Constant.NS_TO_US, - self._data[self.COMM_NOT_OVERLAP_EXCLUDE_RECEIVE] / Constant.NS_TO_US, - self._data[self.PREPARE] / Constant.NS_TO_US]] + + # convert time unit, ns to us + self._data = {k: v / Constant.NS_TO_US if k != self.STEP else v for k, v in self._data.items()} + return self._data def _init_task_info_from_db(self): db_path = self.file_path.get(Constant.PROFILER_DB_PATH) diff --git a/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_analysis.py index 5514d4ec25f6c20ee4bb2da1b19e6e4a86e78d3e..fc06675f028808d6fd86446197725fe8ceb01457 100644 --- a/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_analysis.py @@ -14,11 +14,13 @@ # limitations under the License. import os import re +import pandas as pd +from msprof_analyze.cluster_analyse.analysis.step_trace_time_parser import StepTraceTimeParser from msprof_analyze.prof_common.db_manager import DBManager from msprof_analyze.cluster_analyse.common_func.utils import increase_shared_value from msprof_analyze.cluster_analyse.cluster_utils.parallel_strategy_calculator import ParallelStrategyCalculator -from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeBean +from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeData from msprof_analyze.prof_common.constant import Constant from msprof_analyze.prof_common.file_manager import FileManager from msprof_analyze.prof_common.logger import get_logger @@ -35,14 +37,19 @@ class StepTraceTimeAnalysis: CLUSTER_TRACE_TIME_TABLE = "ClusterStepTraceTime" PROFILER_METADATA_JSON = "profiler_metadata.json" PARALLEL_HEADERS = ["DP Index", "PP Index", "TP Index"] + CLUSTER_CSV_HEADERS = ["Step", "Type", "Index", "Computing", "Communication(Not Overlapped)", "Overlapped", + "Communication", "Free", "Stage", "Bubble", + "Communication(Not Overlapped and Exclude Receive)", "Preparing"] + STANDARD_COLUMN_ORDER = ["Step", "Type", "Index", "Comp", "Comm_non_overlap", "Overlap", "Comm", "Free", "Stage", + "Bubble", "Comm_non_overlap_excl_recv", "Prep"] def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.communication_data_dict = param.get(Constant.COMM_DATA_DICT, {}) - self.step_time_dict = {} - self.step_data_list = [] + self.step_df = pd.DataFrame() + self.header_to_column_mapping = {} self.data_type = param.get(Constant.DATA_TYPE) self.data_simplification = param.get(Constant.DATA_SIMPLIFICATION) self.distributed_args = None @@ -58,27 +65,8 @@ class StepTraceTimeAnalysis: ret.append(max(item)) return ret - @staticmethod - def find_msprof_json(path): - msprof_pattern = r'^msprof_\d{14}\.json$' - msprof_slice_pattern = r'^msprof_slice_\d{1}_\d{14}\.json$' - msprof_dict, msprof_slice_dict = {}, {} - for file_name in os.listdir(path): - if re.match(msprof_pattern, file_name): - timestamp = re.search(r"\d{14}", file_name).group() - msprof_dict.setdefault(timestamp, []).append(os.path.join(path, file_name)) - elif re.match(msprof_slice_pattern, file_name): - timestamp = re.search(r"\d{14}", file_name).group() - msprof_slice_dict.setdefault(timestamp, []).append(os.path.join(path, file_name)) - if msprof_dict: - max_timestamp = max(msprof_dict.keys()) - return msprof_dict.get(max_timestamp) - if msprof_slice_dict: - max_timestamp = max(msprof_slice_dict.keys()) - return msprof_slice_dict.get(max_timestamp) - return [] - def run(self, completed_processes, lock): + self.load_distributed_args() self.load_step_trace_time_data() self.analyze_step_time() self.partition_ranks_data() @@ -102,128 +90,109 @@ class StepTraceTimeAnalysis: self.distributed_args = None return - if len(parallelism_map) > len(self.step_time_dict): + step_df_ranks = self.step_df[self.step_df['Type'] == Constant.RANK]['Index'].unique().tolist() + if len(parallelism_map) > len(step_df_ranks): missing_rank_ids = [ rank_id for rank_id in range(len(parallelism_map)) - if rank_id not in self.step_time_dict + if rank_id not in step_df_ranks ] logger.warning("Step trace data length should equal to real rank numbers, but get step data length =" "%s, real rank numbers = %s, maybe lost some rank ids = %s, please check your profiling " - "data.", str(len(self.step_time_dict)), str(len(parallelism_map)), str(missing_rank_ids)) + "data.", str(len(step_df_ranks)), str(len(parallelism_map)), str(missing_rank_ids)) - if len(parallelism_map) < len(self.step_time_dict): + if len(parallelism_map) < len(step_df_ranks): logger.error("Step trace data length should equal to real rank numbers, but get step data length = %s," " real rank numbers = %s, maybe parallel params in profiler_metadata.json is error, " "please check your metadata data.", - str(len(self.step_time_dict)), str(len(parallelism_map))) + str(len(step_df_ranks)), str(len(parallelism_map))) self.distributed_args = None return - for step_data in self.step_data_list: - rank_id = step_data[2] - if isinstance(rank_id, int): - # type is rank, rank_id is int - step_data.extend(list(parallelism_map[rank_id]) - if parallelism_map[rank_id] else ['NA'] * len(self.PARALLEL_HEADERS)) - else: - # type is stage, rank_id is tuple - step_data.extend(['NA'] * len(self.PARALLEL_HEADERS)) + # Add parallel strategy columns + for header in self.PARALLEL_HEADERS: + self.step_df[header] = 'NA' + + # Update parallel strategy values for rank type rows + rank_mask = self.step_df['Type'] == Constant.RANK + for idx in self.step_df[rank_mask].index: + rank_id = self.step_df.loc[idx, 'Index'] + if isinstance(rank_id, int) and rank_id in parallelism_map: + parallel_values = parallelism_map[rank_id] + if parallel_values: + for header, value in zip(self.PARALLEL_HEADERS, parallel_values): + self.step_df.loc[idx, header] = value def dump_data(self): - if not self.step_data_list: + if self.step_df.empty: logger.warning("Can't get step time info!") return + output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if self.data_type == Constant.TEXT: - headers = self.get_headers() - FileManager.create_csv_file(self.cluster_analysis_output_path, self.step_data_list, - self.CLUSTER_TRACE_TIME_CSV, headers) + self._dump_text(output_path) else: - output_path = os.path.join(self.cluster_analysis_output_path, Constant.CLUSTER_ANALYSIS_OUTPUT) - result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) - DBManager.create_tables(result_db, self.CLUSTER_TRACE_TIME_TABLE) - column_len = DBManager.get_table_column_count(result_db, self.CLUSTER_TRACE_TIME_TABLE) - data_len = len(self.step_data_list[0]) - if data_len < column_len: - for data in self.step_data_list: - data.extend([0] * (column_len - data_len)) - conn, cursor = DBManager.create_connect_db(result_db) - sql = "insert into {} values ({value})".format(self.CLUSTER_TRACE_TIME_TABLE, - value="?," * (len(self.step_data_list[0]) - 1) + "?") - DBManager.executemany_sql(conn, sql, self.step_data_list) - DBManager.destroy_db_connect(conn, cursor) + self._dump_db(output_path) def load_step_trace_time_data(self): - for rank_id, profiling_dir_path in self.data_map.items(): + if self.data_type == Constant.TEXT: + parser_method = StepTraceTimeParser.parse_from_msprof_db if self.is_msprof \ + else StepTraceTimeParser.parse_from_step_trace_time_csv + else: # DB case + if self.is_msprof: + parser_method = StepTraceTimeParser.parse_from_msprof_db + else: + parser_method = StepTraceTimeParser.parse_from_mindspore_db if self.is_mindspore \ + else StepTraceTimeParser.parse_from_analysis_db + + step_trace_time = StepTraceTimeParser.process_cluster_data(self.data_map, parser_method) + self.step_df = step_trace_time.get_cluster_data() + self.step_df['Type'] = Constant.RANK + self.step_df = self.step_df.rename(columns={'Device_id': 'Index'}) + self.header_to_column_mapping = step_trace_time.get_columns_mapping() + + def load_distributed_args(self): + for _, profiling_dir_path in self.data_map.items(): metadata_path = os.path.join(profiling_dir_path, self.PROFILER_METADATA_JSON) - if not self.distributed_args and os.path.exists(metadata_path): + if os.path.exists(metadata_path): metadata = FileManager.read_json_file(metadata_path) self.distributed_args = metadata.get(Constant.DISTRIBUTED_ARGS, None) if metadata else None - if self.data_type == Constant.TEXT: - if self.is_msprof: - msprof_json = self.find_msprof_json(os.path.join(profiling_dir_path, "mindstudio_profiler_output")) - self.step_time_dict[rank_id] = MsprofStepTraceTimeAdapter( - msprof_json).generate_step_trace_time_data() - else: - step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) - if os.path.exists(step_time_file): - self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) - else: - if self.is_msprof or self.is_mindspore: - profiler_db = MsprofDataPreprocessor.get_msprof_profiler_db_path(profiling_dir_path) if \ - self.is_msprof else os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, - f"ascend_mindspore_profiler_{rank_id}.db") - self.step_time_dict[rank_id] = MsprofStepTraceTimeDBAdapter( - {Constant.PROFILER_DB_PATH: profiler_db}).generate_step_trace_time_data() - else: - step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, - Constant.DB_COMMUNICATION_ANALYZER) - if (os.path.exists(step_time_file) and - DBManager.check_tables_in_db(step_time_file, Constant.TABLE_STEP_TRACE)): - conn, cursor = DBManager.create_connect_db(step_time_file) - sql = "select * from {0}".format(Constant.TABLE_STEP_TRACE) - data = DBManager.fetch_all_data(cursor, sql, is_dict=False) - self.step_time_dict[rank_id] = data - DBManager.destroy_db_connect(conn, cursor) - if not self.step_time_dict.get(rank_id): - logger.warning("Rank %s does not have a valid step_trace_time data in %s file.", - str(rank_id), str(self.data_type)) + if self.distributed_args: + return def analyze_step_time(self): - for rank_id, data_bean_list in self.step_time_dict.items(): - for data_bean in data_bean_list: - if self.data_type == Constant.TEXT: - self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) - else: - self.step_data_list.append([data_bean[0], Constant.RANK, rank_id] + list(data_bean[1:])) - + # Process stage data stage_list = self.generate_stage_group_list() if not stage_list: return - step_group_dict = {} - for data_list in self.step_data_list: - stage_group = tuple() - for stage in stage_list: - if data_list[2] in stage: - stage_group = tuple(stage) - break - key = (data_list[0], stage_group) - step_group_dict.setdefault(key, []).append(data_list[3:]) - - for key, data_group_list in step_group_dict.items(): - if self.data_type == Constant.TEXT: - self.step_data_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + + # Group data by step and stage + stage_data = [] + # Handle both None and numeric steps + for step in self.step_df['Step'].unique(): + if pd.isna(step): + step_data = self.step_df[self.step_df['Step'].isna()] else: - index = "(" + ",".join(str(i) for i in key[1]) + ")" - self.step_data_list.append([key[0], Constant.STAGE, index] + self.get_max_data_row(data_group_list)) + step_data = self.step_df[self.step_df['Step'] == step] + for stage in stage_list: + stage_ranks = set(stage) + stage_rows = step_data[step_data['Index'].isin(stage_ranks)] + if not stage_rows.empty: + # Calculate max values for numeric columns excluding Step, Index, and Type + exclude_cols = ['Step', 'Index', 'Type'] + max_cols = [col for col in self.step_df.columns if col not in exclude_cols] + max_values = stage_rows[max_cols].max() + stage_row = {'Step': step, 'Type': Constant.STAGE, 'Index': tuple(stage)} + stage_row.update(max_values.to_dict()) + stage_data.append(stage_row) + + # Add stage data to DataFrame + if stage_data: + stage_df = pd.DataFrame(stage_data) + self.step_df = pd.concat([self.step_df, stage_df], ignore_index=True) def get_headers(self): - if self.step_time_dict: - for rank in self.step_time_dict: - if self.step_time_dict.get(rank) and self.distributed_args: - return self.step_time_dict[rank][0].all_headers + self.PARALLEL_HEADERS - elif self.step_time_dict.get(rank): - return self.step_time_dict[rank][0].all_headers + if not self.step_df.empty: + return list(self.step_df.columns) return [] def generate_stage_group_list(self): @@ -238,3 +207,46 @@ class StepTraceTimeAnalysis: stage_analyzer = StageInfoAnalysis(params) stage_list = stage_analyzer.run() return stage_list + + def _dump_text(self, output_path): + save_df = self.step_df.copy() + existing_columns = [col for col in self.STANDARD_COLUMN_ORDER if col in save_df.columns] + existing_columns.extend([col for col in self.PARALLEL_HEADERS if col in save_df.columns]) + save_df = save_df[existing_columns] # Reorder columns + try: + if self.header_to_column_mapping: + column_to_header = {v: k for k, v in self.header_to_column_mapping.items()} + save_df = save_df.rename(columns=column_to_header) + save_df.to_csv(os.path.join(output_path, self.CLUSTER_TRACE_TIME_CSV), index=False) + except Exception as err: + logger.error(f"Dump {self.CLUSTER_TRACE_TIME_CSV} failed, err: {err}") + + def _dump_db(self, output_path): + save_df = self.step_df.copy() + # Process stage type Index for DB data + stage_mask = save_df['Type'] == Constant.STAGE + if stage_mask.any(): + save_df.loc[stage_mask, 'Index'] = save_df.loc[stage_mask, 'Index'].apply( + lambda x: "(" + ",".join(str(i) for i in x) + ")" if isinstance(x, tuple) else x + ) + # Reorder columns + existing_columns = [col for col in self.STANDARD_COLUMN_ORDER if col in save_df.columns] + existing_columns.extend([col for col in self.PARALLEL_HEADERS if col in save_df.columns]) + save_df = save_df[existing_columns] + + # Add missing parallel headers with None values + for col in self.PARALLEL_HEADERS: + if col not in save_df: + save_df[col] = None + result_db = os.path.join(output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + try: + DBManager.create_tables(result_db, self.CLUSTER_TRACE_TIME_TABLE) + data = save_df.values.tolist() + conn, cursor = DBManager.create_connect_db(result_db) + sql = "insert into {} values ({value})".format(self.CLUSTER_TRACE_TIME_TABLE, + value="?," * (len(save_df.columns) - 1) + "?") + DBManager.executemany_sql(conn, sql, data) + except Exception as err: + logger.error(f"Dump {self.CLUSTER_TRACE_TIME_TABLE} failed, err: {err}") + finally: + DBManager.destroy_db_connect(conn, cursor) diff --git a/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_parser.py b/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..77802094dcb30fe9addf4c6fbb134fbaa8c3b0b4 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/analysis/step_trace_time_parser.py @@ -0,0 +1,108 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path +import pandas as pd +from msprof_analyze.cluster_analyse.cluster_data_preprocess.mindspore_data_preprocessor import MindsporeDataPreprocessor +from msprof_analyze.cluster_analyse.cluster_data_preprocess.msprof_data_preprocessor import MsprofDataPreprocessor +from msprof_analyze.prof_common.database_service import DatabaseService +from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeData +from msprof_analyze.prof_common.path_manager import PathManager +from msprof_analyze.cluster_analyse.analysis.msprof_step_trace_time_adapter import (MsprofStepTraceTimeAdapter, + MsprofStepTraceTimeDBAdapter) +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger + +logger = get_logger() + + +class StepTraceTimeParser: + + @staticmethod + def check_file(file: str) -> bool: + try: + PathManager.check_file_size(file) + PathManager.check_path_readable(file) + return True + except Exception as err: + logger.warning(f"Check file failed: {file}, error message: {err}") + return False + + @staticmethod + def parse_from_step_trace_time_csv(profiling_dir_path: str) -> pd.DataFrame: + """Parse step trace time data from CSV file for a single device""" + step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) + if not StepTraceTimeParser.check_file(step_time_file): + return pd.DataFrame() + return pd.read_csv(step_time_file) + + @staticmethod + def parse_from_analysis_db(profiling_dir_path: str) -> pd.DataFrame: + """Parse step trace time data from analysis DB for a single device""" + analysis_db = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.DB_COMMUNICATION_ANALYZER) + data_service = DatabaseService(analysis_db, {}) + data_service.add_table_for_query(Constant.TABLE_STEP_TRACE) + query_res = data_service.query_data() + if Constant.TABLE_STEP_TRACE not in query_res: + logger.warning(f"Failed to query {Constant.TABLE_STEP_TRACE} in {profiling_dir_path}") + return pd.DataFrame() + return pd.DataFrame(query_res.get(Constant.TABLE_STEP_TRACE)) + + @staticmethod + def parse_from_msprof_db(profiling_dir_path: str) -> pd.DataFrame: + """Parse step trace time data from msprof DB for a single device""" + profiler_db = MsprofDataPreprocessor.get_msprof_profiler_db_path(profiling_dir_path) + step_time_dict = MsprofStepTraceTimeDBAdapter( + {Constant.PROFILER_DB_PATH: profiler_db}).generate_step_trace_time_data() + return pd.DataFrame(step_time_dict) + + @staticmethod + def parse_from_mindspore_db(profiling_dir_path: str) -> pd.DataFrame: + """Parse step trace time data from mindspore DB for a single device""" + profiler_db = MindsporeDataPreprocessor.get_mindspore_db(profiling_dir_path) + step_time_dict = MsprofStepTraceTimeDBAdapter( + {Constant.PROFILER_DB_PATH: profiler_db}).generate_step_trace_time_data() + return pd.DataFrame(step_time_dict) + + @staticmethod + def parse_from_msprof_json(profiling_dir_path: str) -> pd.DataFrame: + """Parse step trace time data from msprof JSON for a single device""" + msprof_json = MsprofStepTraceTimeAdapter.find_msprof_json(os.path.join(profiling_dir_path, + "mindstudio_profiler_output")) + step_time_dict = MsprofStepTraceTimeAdapter(msprof_json).generate_step_trace_time_data() + return pd.DataFrame(step_time_dict) + + @staticmethod + def process_cluster_data(data_map: dict, parse_func: callable): + """Process step trace time data for all devices in the cluster + + Args: + data_map: Dictionary mapping rank_id to profiling directory path + parse_func: Static parse function to use for each device + """ + step_trace_data = StepTraceTimeData() + for rank_id, profiling_dir_path in data_map.items(): + try: + df = parse_func(profiling_dir_path) + if not df.empty: + step_trace_data.add_rank_data(rank_id, df) + else: + logger.warning(f"No valid step trace time data found for rank {rank_id}") + except Exception as e: + logger.error(f"Error processing data for rank {rank_id}: {str(e)}") + continue + + return step_trace_data + diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py index c22ecb1ad907f8d20bfe2b380db798928d0343b9..fc2afc774133f5a023aa7d892d9d05a415f53dcb 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py @@ -38,6 +38,26 @@ class MindsporeDataPreprocessor(DataPreprocessor): return os.path.join(profiling_path, file_name) return "" + @classmethod + def get_mindspore_db(cls, profiling_path): + mindspore_db_pattern = r"^ascend_mindspore_profiler(?:_\d+)?\.db$" + mindspore_db_list = [] + + # Check if ASCEND_PROFILER_OUTPUT exists as subdirectory + possible_subdir = os.path.join(profiling_path, Constant.SINGLE_OUTPUT) + search_path = profiling_path + if os.path.isdir(possible_subdir): + search_path = possible_subdir + + for file_name in os.listdir(search_path): + if re.match(mindspore_db_pattern, file_name): + mindspore_db_list.append(file_name) + + if mindspore_db_list: + mindspore_db_list.sort() + return os.path.join(search_path, mindspore_db_list[-1]) + return "" + def get_data_map(self) -> dict: rank_id_map = defaultdict(list) for dir_name in self.path_list: diff --git a/profiler/msprof_analyze/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/msprof_analyze/cluster_analyse/prof_bean/step_trace_time_bean.py index a7d92cc3049d171ac14230f1363db326607c6348..4fb6dc3ef59a917bb03dd6b0f4869f0a47e82754 100644 --- a/profiler/msprof_analyze/cluster_analyse/prof_bean/step_trace_time_bean.py +++ b/profiler/msprof_analyze/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -12,6 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from typing import Dict +import pandas as pd + from msprof_analyze.prof_common.logger import get_logger logger = get_logger() @@ -44,3 +48,117 @@ class StepTraceTimeBean: @property def all_headers(self) -> list: return self.COMPLEMENT_HEADER + list(self._data.keys())[1:] + + +class StepTraceTimeData: + _MAPPING_DF = pd.DataFrame([ + ('Step', ['Step', 'step']), + ('Comp', ['Computing', 'computing']), + ('Comm_non_overlap', ['Communication(Not Overlapped)', 'communication_not_overlapped']), + ('Overlap', ['Overlapped', 'overlapped']), + ('Comm', ['Communication', 'communication']), + ('Free', ['Free', 'free']), + ('Stage', ['Stage', 'stage']), + ('Bubble', ['Bubble', 'bubble']), + ('Comm_non_overlap_excl_recv', [ + 'Communication(Not Overlapped and Exclude Receive)', + 'communication_not_overlapped_and_exclude_receive' + ]), + ('Prep', ['Preparing', 'preparing']), + ('Device_id', ['Device_id', 'deviceId']) + ], columns=['short_name', 'possible_variations']) + + # Generate the mapping dictionary + _COLUMN_MAPPING = {} + for _, row in _MAPPING_DF.iterrows(): + for variation in row['possible_variations']: + _COLUMN_MAPPING[variation] = row['short_name'] + + def __init__(self): + self.rank_data = {} + self.mapping_record = {} # key: rank_id, value: columns to standard_cols dict + + def add_rank_data(self, rank_id: int, raw_df: pd.DataFrame): + if rank_id in self.rank_data: + logger.warning(f"Duplicated rank_id: {rank_id}") + return + self._preprocess_data(rank_id, raw_df) + + def get_rank_data(self, rank_id: int): + """ + Get processed data for specific rank + """ + return self.rank_data.get(rank_id, None) + + def get_columns_mapping(self, rank_id=None): + """ + Get columns mapping info for specific rank. + If rank_id is None, check if all mappings are the same: + - If all same, return the common mapping + - If different, return None + """ + if rank_id is not None: + return self.mapping_record.get(rank_id, None) + + # When rank_id is None, check all mappings + if not self.mapping_record: + return None + + # Get all values + values = list(self.mapping_record.values()) + first_value = values[0] + + # Check if all values are equal to the first value + for value in values[1:]: + if value != first_value: + logger.warning(f"Step trace time columns are not same for cluster data") + return None + + return first_value + + def get_cluster_data(self): + """ + Get total data + """ + return self._merge_data() + + def get_mapping_info(self, rank_id: int): + """ + Get column mapping information for specific rank + """ + return self.mapping_record.get(rank_id, None) + + def _merge_data(self): + """ + Merge all rank DataFrames into a single combined DataFrame + """ + if not self.rank_data: + return pd.DataFrame() + return pd.concat(list(self.rank_data.values()), ignore_index=True) + + def _preprocess_data(self, rank_id: int, df: pd.DataFrame): + """ + 1) Normalize the column names + 2) add rank_id if necessary + """ + if df is None or df.empty: + logger.warning(f"Invalid input for StepTraceTimeData, rank_id: {rank_id}") + return + new_df = df.copy() + mapping_info = {} # columns to standard mapping + for column in new_df.columns: + if column not in self._COLUMN_MAPPING: + logger.warning(f"Unrecognized column in rank {rank_id} StepTraceTimeData: {column}") + return + mapping_info[column] = self._COLUMN_MAPPING[column] + new_df.columns = mapping_info.values() + + if 'Device_id' not in new_df.columns: + new_df['Device_id'] = rank_id + self.rank_data[rank_id] = new_df + self.mapping_record[rank_id] = mapping_info + + + + + diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_step_trace_time_analysis.py b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_step_trace_time_analysis.py index 067886ec20125de045e18d183d0cf31d69b3eb23..05ee6b5e360201285163806f9deae2e0407b96b2 100644 --- a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_step_trace_time_analysis.py +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_step_trace_time_analysis.py @@ -15,6 +15,8 @@ import unittest +import pandas as pd + from msprof_analyze.cluster_analyse.analysis.step_trace_time_analysis import StepTraceTimeAnalysis from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeBean from msprof_analyze.prof_common.constant import Constant @@ -44,38 +46,28 @@ class TestStepTraceTimeAnalysis(unittest.TestCase): def test_analyze_step_time_when_give_normal_expect_stage(self): check = StepTraceTimeAnalysis({}) check.data_type = Constant.TEXT - check.step_time_dict = { - 0: [ - StepTraceTimeBean({"Step": 0, "time1": 1, "time2": 2}), - StepTraceTimeBean({"Step": 1, "time1": 1, "time2": 2}), - ], - 1: [ - StepTraceTimeBean({"Step": 0, "time1": 10, "time2": 20}), - StepTraceTimeBean({"Step": 1, "time1": 10, "time2": 20}) - ] - } + step_time_data = [ + {"Step": 0, "Type": "rank", "Index": 0, "time1": 1, "time2": 2}, + {"Step": 1, "Type": "rank", "Index": 0, "time1": 1, "time2": 2}, + {"Step": 0, "Type": "rank", "Index": 1, "time1": 10, "time2": 20}, + {"Step": 1, "Type": "rank", "Index": 1, "time1": 10, "time2": 20} + ] + check.step_df = pd.DataFrame(step_time_data) check.communication_data_dict = {Constant.STAGE: [[0, 1]]} check.analyze_step_time() - self.assertIn([0, 'stage', (0, 1), 10.0, 20.0], check.step_data_list) + self.assertIn([0, 'stage', (0, 1), 10.0, 20.0], check.step_df.values.tolist()) def test_analyze_step_time_when_given_none_step_expect_stage_and_rank_row(self): check = StepTraceTimeAnalysis({}) check.data_type = Constant.TEXT - check.step_time_dict = { - 0: [ - StepTraceTimeBean({"Step": None, "time1": 1, "time2": 2}) - ], - 1: [ - StepTraceTimeBean({"Step": None, "time1": 10, "time2": 20}), - ], - 2: [ - StepTraceTimeBean({"Step": None, "time1": 2, "time2": 3}), - ], - 3: [ - StepTraceTimeBean({"Step": None, "time1": 1, "time2": 1}), - ], - } + step_time_data = [ + {"Step": None, "Type": "rank", "Index": 0, "time1": 1, "time2": 2}, + {"Step": None, "Type": "rank", "Index": 1, "time1": 10, "time2": 20}, + {"Step": None, "Type": "rank", "Index": 2, "time1": 2, "time2": 3}, + {"Step": None, "Type": "rank", "Index": 3, "time1": 1, "time2": 1} + ] + check.step_df = pd.DataFrame(step_time_data) check.communication_data_dict = {Constant.STAGE: [[0, 1], [2, 3]]} check.analyze_step_time() - self.assertIn([None, 'stage', (2, 3), 2.0, 3.0], check.step_data_list) - self.assertIn([None, 'rank', 0, 1.0, 2.0], check.step_data_list) \ No newline at end of file + self.assertIn([None, 'stage', (2, 3), 2.0, 3.0], check.step_df.values.tolist()) + self.assertIn([None, 'rank', 0, 1.0, 2.0], check.step_df.values.tolist()) \ No newline at end of file diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/prof_bean/test_step_trace_time_data.py b/profiler/msprof_analyze/test/ut/cluster_analyse/prof_bean/test_step_trace_time_data.py new file mode 100644 index 0000000000000000000000000000000000000000..ba4ab3ea668ecb75e9a3f8164d324e645537bcf1 --- /dev/null +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/prof_bean/test_step_trace_time_data.py @@ -0,0 +1,139 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import pandas as pd +from msprof_analyze.cluster_analyse.prof_bean.step_trace_time_bean import StepTraceTimeData + + +class TestStepTraceTimeData(unittest.TestCase): + def setUp(self): + self.step_trace_data = StepTraceTimeData() + # Sample data for testing + self.sample_df = pd.DataFrame({ + 'Step': ['Step1', 'Step2'], + 'Computing': [1.0, 2.0], + 'Communication': [0.5, 1.0], + 'Free': [0.2, 0.3] + }) + + def test_add_rank_data_when_valid_dataframe_then_success(self): + # Test adding data from DataFrame + self.step_trace_data.add_rank_data(0, self.sample_df) + rank_data = self.step_trace_data.get_rank_data(0) + + self.assertIsNotNone(rank_data) + self.assertEqual(len(rank_data), 2) + self.assertEqual(list(rank_data.columns), ['Step', 'Comp', 'Comm', 'Free', 'Device_id']) + self.assertEqual(rank_data['Device_id'].iloc[0], 0) + + def test_add_rank_data_when_duplicate_rank_id_then_keep_original(self): + # Test handling of duplicate rank_id + self.step_trace_data.add_rank_data(0, self.sample_df) + self.step_trace_data.add_rank_data(0, self.sample_df) # Should log warning + rank_data = self.step_trace_data.get_rank_data(0) + self.assertEqual(len(rank_data), 2) # Should still have original data + + def test_get_cluster_data_when_multiple_ranks_then_merged_successfully(self): + # Test merging data from multiple ranks + self.step_trace_data.add_rank_data(0, self.sample_df) + self.step_trace_data.add_rank_data(1, self.sample_df) + + cluster_data = self.step_trace_data.get_cluster_data() + self.assertEqual(len(cluster_data), 4) # 2 rows per rank + self.assertEqual(len(cluster_data['Device_id'].unique()), 2) + + def test_get_mapping_info_when_valid_rank_then_return_mapping(self): + # Test getting mapping information + self.step_trace_data.add_rank_data(0, self.sample_df) + mapping_info = self.step_trace_data.get_mapping_info(0) + + self.assertIsNotNone(mapping_info) + self.assertEqual(mapping_info['Computing'], 'Comp') + self.assertEqual(mapping_info['Communication'], 'Comm') + self.assertEqual(mapping_info['Free'], 'Free') + + def test_add_rank_data_when_empty_dataframe_then_no_data_added(self): + # Test handling of invalid input + empty_df = pd.DataFrame() + self.step_trace_data.add_rank_data(0, empty_df) + self.assertIsNone(self.step_trace_data.get_rank_data(0)) + + def test_add_rank_data_when_column_variations_then_normalized_successfully(self): + # Test column name normalization with different variations + df_with_variations = pd.DataFrame({ + 'step': ['Step1', 'Step2'], + 'computing': [1.0, 2.0], + 'communication_not_overlapped': [0.5, 1.0], + 'overlapped': [0.3, 0.4] + }) + + self.step_trace_data.add_rank_data(0, df_with_variations) + rank_data = self.step_trace_data.get_rank_data(0) + + self.assertIsNotNone(rank_data) + self.assertIn('Step', rank_data.columns) + self.assertIn('Comp', rank_data.columns) + self.assertIn('Comm_non_overlap', rank_data.columns) + self.assertIn('Overlap', rank_data.columns) + + def test_get_columns_mapping_when_specific_rank_then_return_mapping(self): + # Test getting mapping for a specific rank + self.step_trace_data.add_rank_data(0, self.sample_df) + mapping = self.step_trace_data.get_columns_mapping(0) + + self.assertIsNotNone(mapping) + self.assertEqual(mapping['Step'], 'Step') + self.assertEqual(mapping['Computing'], 'Comp') + self.assertEqual(mapping['Communication'], 'Comm') + self.assertEqual(mapping['Free'], 'Free') + + def test_get_columns_mapping_when_all_ranks_same_then_return_common_mapping(self): + # Test getting mapping when all ranks have same mapping + self.step_trace_data.add_rank_data(0, self.sample_df) + self.step_trace_data.add_rank_data(1, self.sample_df) + + mapping = self.step_trace_data.get_columns_mapping() + self.assertIsNotNone(mapping) + self.assertEqual(mapping['Step'], 'Step') + self.assertEqual(mapping['Computing'], 'Comp') + self.assertEqual(mapping['Communication'], 'Comm') + self.assertEqual(mapping['Free'], 'Free') + + def test_get_columns_mapping_when_different_mappings_then_return_none(self): + # Test getting mapping when ranks have different mappings + df1 = pd.DataFrame({ + 'Step': ['Step1'], + 'Computing': [1.0], + 'Communication': [0.5], + 'Free': [0.2] + }) + df2 = pd.DataFrame({ + 'step': ['Step1'], + 'computing': [1.0], + 'communication': [0.5], + 'free': [0.2] + }) + + self.step_trace_data.add_rank_data(0, df1) + self.step_trace_data.add_rank_data(1, df2) + + mapping = self.step_trace_data.get_columns_mapping() + self.assertIsNone(mapping) + + def test_get_columns_mapping_when_no_data_then_return_none(self): + # Test getting mapping when no data exists + mapping = self.step_trace_data.get_columns_mapping() + self.assertIsNone(mapping)