diff --git a/profiler/msprof_analyze/cluster_analyse/README.md b/profiler/msprof_analyze/cluster_analyse/README.md index 4ac6f674fc3293a5a434e423ce8a748478846f74..a1e275d2aabc3e2ba84e5ddb019115a6a16ede2d 100644 --- a/profiler/msprof_analyze/cluster_analyse/README.md +++ b/profiler/msprof_analyze/cluster_analyse/README.md @@ -110,6 +110,8 @@ experimental_config = torch_npu.profiler._ExperimentalConfig( | mstx2commop | 集群场景基于mstx打点信息生成通信算子信息,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件ascend_pytorch_profiler_{rank_id}.db增加COMMUNICATION_OP, STRING_IDS分析表格。 | 否 | | cluster_time_summary | 集群场景迭代耗时细粒度拆解,详见[使用指导](../docs/cluster_time_summary.md)。 | 否 | | cluster_time_compare_summary | 集群间迭代耗时比对,详见[使用指导](../docs/cluster_time_summary.md)。 | 否 | +| p2p_pairing | 集群场景P2P算子生成全局关联索引,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出的关联索引会作为一个新的字段`opConnectionId`附在原性能数据ascend_pytorch_profiler_{rank_id}.db文件的`COMMUNICATION_OP`的表中。 | 否 | +| pp_chart | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,还原pp流水图 | 否 | | 自定义分析参数 | 与cann_api_sum、compute_op_sum、hccl_sum等参数功能类似,用户可自定义一套性能数据的分析规则,要求用户开发者详细了解性能分析规则,具体开发指导请参见“[自定义分析规则开发指导](#自定义分析规则开发指导)”。 | 否 | diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py index 0289b1747a642f40220c43784ef2231d4c06b0bf..9f84e961fb5e9c3fa3b7f0393010c2200d88fcde 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse +import json import os import shutil import sys @@ -24,6 +25,7 @@ import pandas as pd from msprof_analyze.prof_common.db_manager import DBManager from msprof_analyze.cluster_analyse.common_func.utils import convert_unit from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.database_service import DatabaseService from msprof_analyze.prof_common.logger import get_logger from msprof_analyze.prof_common.path_manager import PathManager from msprof_analyze.cluster_analyse.cluster_data_preprocess.msprof_data_preprocessor import MsprofDataPreprocessor @@ -36,8 +38,10 @@ logger = get_logger() class BaseRecipeAnalysis(ABC): UNIT = "Us" DB_UNIT = "Ns" - RANK_LIST = "rank_list" + TP_SIZE = "tensor_model_parallel_size" + PP_SIZE = "pipeline_model_parallel_size" + DP_SIZE = "data_parallel_size" def __init__(self, params): self._collection_dir = params.get(Constant.COLLECTION_PATH, "") @@ -152,6 +156,67 @@ class BaseRecipeAnalysis(ABC): shutil.copy(helper_file_path, helper_output_path) os.chmod(helper_output_path, Constant.FILE_AUTHORITY) + def map_rank_pp_stage(self, distributed_args): + tp_size = distributed_args.get(self.TP_SIZE, 1) + pp_size = distributed_args.get(self.PP_SIZE, 1) + dp_size = distributed_args.get(self.DP_SIZE, 1) + rank_pp_stage_map = {} + rank = 0 + for i in range(pp_size): + for _ in range(tp_size * dp_size): + rank_pp_stage_map[rank] = i + rank += 1 + return rank_pp_stage_map + + def load_distributed_args(self): + tp_size = self._extra_args.get("tp", None) + pp_size = self._extra_args.get("pp", None) + dp_size = self._extra_args.get("dp", None) + if tp_size and pp_size and dp_size: + if tp_size <= 0 or pp_size <= 0 or dp_size <= 0: + logger.error("Invalid distributed_args, tp pp dp < 0.") + return None + return { + self.TP_SIZE: tp_size, + self.DP_SIZE: dp_size, + self.PP_SIZE: pp_size, + } + else: + rank_id = list(self._data_map.keys())[0] + profiler_db_path = self._data_map[rank_id] + db_path = os.path.join(profiler_db_path, Constant.SINGLE_OUTPUT, f"ascend_pytorch_profiler_{rank_id}.db") + if os.path.exists(db_path): + try: + service = DatabaseService(db_path) + service.add_table_for_query("META_DATA", ["name", "value"]) + df = service.query_data().get("META_DATA", None) + distributed_args = df.loc[df["name"] == "distributed_args", "value"] + if distributed_args.empty: + distributed_args = {} + logger.error("Distributed args not in profiling files, please input manually.") + else: + distributed_args = json.loads(distributed_args.values[0]) + except Exception as err: + logger.error(err) + logger.error("Distributed args not in profiling files, please input manually.") + return None + tp_size = distributed_args.get(self.TP_SIZE, 1) + pp_size = distributed_args.get(self.PP_SIZE, 1) + dp_size = distributed_args.get(self.DP_SIZE, 1) + if not isinstance(tp_size, int) or not isinstance(pp_size, int) or not isinstance(dp_size, int): + logger.error("Invalid distributed_args in profiling files, please input manually.") + return None + if tp_size <= 0 or pp_size <= 0 or dp_size <= 0: + logger.error("Invalid distributed_args in profiling files, please input manually.") + return None + return { + self.TP_SIZE: tp_size, + self.PP_SIZE: pp_size, + self.DP_SIZE: dp_size, + } + logger.error(f"Db_file: {db_path} not exist.") + return None + def _get_rank_db(self): invalid_rank_id = [] if self._rank_list == 'all': diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..a355e5a7f08206fc39dda4646817224c067f29f7 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/p2p_pairing.py b/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/p2p_pairing.py new file mode 100644 index 0000000000000000000000000000000000000000..fd7224f372018f4d64aa2d46d940561030303f00 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/p2p_pairing/p2p_pairing.py @@ -0,0 +1,242 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from json import JSONDecodeError + +import numpy as np +import pandas as pd + +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.constant import ProfilerTableConstant +from msprof_analyze.prof_common.db_manager import DBManager +from msprof_analyze.prof_common.file_manager import FileManager +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_exports.p2p_pairing_export import P2PPairingExport + + +logger = get_logger() + + +class P2PPairing(BaseRecipeAnalysis): + + P2P_OP_NAME_PATTERN = r"^hcom_([Ss]end|[Rr](ecv|eceive))__\d+_\d+_\d+$" + DOMAIN_ID_EXTRACT_PATTERN = r"__(\d+)_\d+_\d+" + RECEIVE_OP_MATCH_PATTERN = r"[Rr]ecv|[Rr]eceive" + VALID_DST_RANK_TASK_TYPE = [Constant.NOTIFY_RECORD, Constant.NOTIFY_WAIT] + # intermediate dataframe column names + COL_NAME_IS_UNIQUE_VALUE = "isUniqueValue" + COL_NAME_OP_DST_RANK = "opDstRank" + COL_NAME_DOMAIN_ID = "domainId" + COL_NAME_IS_RECEIVE = "isReceive" + COL_NAME_OP_NAMING_INDEX = "opNamingIndex" + # output column name + COL_NAME_P2P_CONNECTION_ID = "opConnectionId" + # export params + TARGET_TABLE_NAME = Constant.TABLE_COMMUNICATION_OP + + def __init__(self, params): + super().__init__(params) + logger.info("P2PPairing init.") + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + def run(self, context): + self.mapper_func(context) + logger.info("P2PPairing completed.") + + def update_connection_info_to_table(self, df_result, profiler_db_path): + """ + 将生成好的连接ID添加至COMMUNICATION OP表中,新增列`opConnectionId`。目前只处理Send和Recv算子,对应的opId会更新具体的连接ID, + 否则置空 + """ + conn, cursor = DBManager.create_connect_db(profiler_db_path) + ret = DBManager.check_columns_exist(cursor, self.TARGET_TABLE_NAME, {self.COL_NAME_P2P_CONNECTION_ID}) + if ret is None: + logger.error("Failed to connect to the database. Please check the database configurations") + return + if self.COL_NAME_P2P_CONNECTION_ID in ret: + logger.error(f"`{self.COL_NAME_P2P_CONNECTION_ID}` already exists in the {self.TARGET_TABLE_NAME}. " + f"Exiting to prevent result overwrite.") + return + DBManager.execute_sql( + conn, + f"ALTER TABLE {self.TARGET_TABLE_NAME} ADD COLUMN {self.COL_NAME_P2P_CONNECTION_ID} TEXT" + ) + DBManager.execute_sql( + conn, + f"UPDATE {self.TARGET_TABLE_NAME} SET {self.COL_NAME_P2P_CONNECTION_ID} = NULL" + ) + DBManager.executemany_sql( + conn, + f""" + UPDATE {self.TARGET_TABLE_NAME} + SET {self.COL_NAME_P2P_CONNECTION_ID} = ? + WHERE {ProfilerTableConstant.OP_NAME} = ?;""", + [(row[self.COL_NAME_P2P_CONNECTION_ID], row[P2PPairingExport.CO_OP_NAME]) + for _, row in df_result.iterrows()] + ) + DBManager.destroy_db_connect(conn, cursor) + + def generate_p2p_connection_index(self, df): + """ + 生成每一个P2P的算子的对应连接ID,连接ID的生成规则按照`通信域_Send卡号_Recv卡号_算子index`。 + 其中通信域为通信域字符串的哈希值后三位表示;Send卡和Recv卡分别为这个通信域内的local rank号;算子index是这两张卡之间按时间线排序, + 出现Send和Recv算子已有的频次。比如说,一个算子的名称为`hcom_send_233_58_1`,自己在通信域内的rank号为0,对端的rank号为1;在这之前 + 并没有存在0卡向1卡的Send任务。因此生成的id为`233_0_1_0` + """ + df[self.COL_NAME_DOMAIN_ID] = df[P2PPairingExport.OP_NAME]. \ + str.extract(self.DOMAIN_ID_EXTRACT_PATTERN)[0] + df[self.COL_NAME_IS_RECEIVE] = df[P2PPairingExport.OP_NAME]. \ + str.contains(self.RECEIVE_OP_MATCH_PATTERN) + df.loc[ + df[self.COL_NAME_IS_RECEIVE], [P2PPairingExport.SRC_RANK, self.COL_NAME_OP_DST_RANK] + ] = df.loc[ + df[self.COL_NAME_IS_RECEIVE], [self.COL_NAME_OP_DST_RANK, P2PPairingExport.SRC_RANK] + ].values + df[self.COL_NAME_OP_NAMING_INDEX] = df.sort_values(by=[P2PPairingExport.START_TIME]). \ + groupby([P2PPairingExport.SRC_RANK, self.COL_NAME_OP_DST_RANK]).cumcount() + df[self.COL_NAME_P2P_CONNECTION_ID] = (df[self.COL_NAME_DOMAIN_ID].astype(str) + "_" + + df[P2PPairingExport.SRC_RANK].astype(str) + "_" + + df[self.COL_NAME_OP_DST_RANK].astype(str) + "_" + + df[self.COL_NAME_OP_NAMING_INDEX].astype(str)) + return df.reset_index() + + def fine_filtering_src_dst_ranks(self, df: pd.DataFrame): + """ + 精筛符合条件的数据: + 1、小算子任务包含了“Notify_Record”和“Notify_Wait”的数据 + 2、上一步得到的数据中对端卡号是否一致,如果不一致则会抛出warning + 3、步骤1得到数据中本端卡号是否一致,如果不一致则会报出error返回空值 + """ + df = df[df[P2PPairingExport.TASK_TYPE].isin(self.VALID_DST_RANK_TASK_TYPE)] + + def check_dst_rank_unique(group): + return group[P2PPairingExport.DST_RANK].nunique() == 1 + + unique_dst_rank: pd.DataFrame = (df.groupby(P2PPairingExport.OP_NAME).apply(check_dst_rank_unique)) + + def get_dst_rank_value(group): + if group[P2PPairingExport.DST_RANK].nunique() == 1: + return group[P2PPairingExport.DST_RANK].iloc[0] + return np.nan + + dst_rank_value: pd.DataFrame = (df.groupby(P2PPairingExport.OP_NAME, group_keys=False). + apply(get_dst_rank_value)) + + df = df.copy() + df[self.COL_NAME_IS_UNIQUE_VALUE] = df[P2PPairingExport.OP_NAME].map(unique_dst_rank) + df[self.COL_NAME_OP_DST_RANK] = df[P2PPairingExport.OP_NAME].map(dst_rank_value) + df[self.COL_NAME_OP_DST_RANK] = df[self.COL_NAME_OP_DST_RANK].fillna(Constant.INVALID_RANK_NUM) + df[self.COL_NAME_OP_DST_RANK] = df[self.COL_NAME_OP_DST_RANK].astype(df[P2PPairingExport.DST_RANK].dtype) + + check_dst_rank_unique_false: pd.DataFrame = df[~df[self.COL_NAME_IS_UNIQUE_VALUE]] + if not check_dst_rank_unique_false.empty: + logger.warning(f"There are communication op entries with multiple destination ranks! " + f"Please check the corresponding profiler database file.") + + df = df[df[self.COL_NAME_IS_UNIQUE_VALUE]] + + src_rank_unique_values: int = df[P2PPairingExport.SRC_RANK].nunique() + if src_rank_unique_values != 1: + logger.error(f"There are communication op entries with multiple source ranks! " + f"Please check the corresponding profiler database file.") + return None + return df.reset_index() + + def filter_data_by_group_name(self, df: pd.DataFrame): + """ + 初步筛选出目标数据: + 1、筛选出Send和Recv的算子 + 2、筛选出同一opId在COMMUNICATION OP中groupName和COMMUNICATION TASK INFO中groupName一致的数据 + """ + df = df[df[P2PPairingExport.OP_NAME].str.match(self.P2P_OP_NAME_PATTERN)] + filtered_df = df[df[P2PPairingExport.CO_GROUP_NAME] == df[P2PPairingExport.CTI_GROUP_NAME]] + anomaly_group_match = df[df[P2PPairingExport.CO_GROUP_NAME] != df[P2PPairingExport.CTI_GROUP_NAME]] + if not anomaly_group_match.empty: + logger.warning(f"Group name mismatch in {len(anomaly_group_match)} entries. Please check the" + f" profiler database in communication task info.") + return filtered_df.reset_index() + + def extract_pp_group_from_metadata(self, profiler_parent_path) -> any: + """ + 从profiler_metadata.json的文件中获取pp通信域的信息 + """ + metadata_path = os.path.join(profiler_parent_path, Constant.PROFILER_METADATA) + try: + if os.path.exists(metadata_path): + metadata = FileManager.read_json_file(metadata_path) + parallel_group_info: dict = metadata.get(Constant.PARALLEL_GROUP_INFO, None) if metadata else None + else: + raise FileNotFoundError(f"No `{Constant.PROFILER_METADATA}` found in {profiler_parent_path}.") + except (FileNotFoundError, JSONDecodeError) as e: + logger.error(f"Failed to load profiler metadata: {e}") + return None + + if parallel_group_info is None: + logger.error(f"No key name `{Constant.PARALLEL_GROUP_INFO}` found in {metadata_path}") + return None + + pp_group_info = [] + for name in parallel_group_info: + each_group_info: dict = parallel_group_info[name] + if each_group_info[Constant.GROUP_NAME] == Constant.PP: + pp_group_info.append(parallel_group_info[name]) + if not pp_group_info: + logger.error(f"No pipeline parallel info found in {metadata_path}") + return None + + return pp_group_info + + def _mapper_func(self, data_map, analysis_class): + profiler_db_path: str = data_map.get(Constant.PROFILER_DB_PATH) + profiler_parent_path: str = os.path.dirname(os.path.dirname(profiler_db_path)) + + df: pd.DataFrame = P2PPairingExport(profiler_db_path, analysis_class).read_export_db() + if df is None or df.empty: + logger.warning(f"There is no stats data in {profiler_db_path}.") + return None + + pp_group_info = self.extract_pp_group_from_metadata(profiler_parent_path) # 暂时没用到,预留给后续确认用全局rank + if pp_group_info is None: + logger.error(f"Cannot obtain pipeline parallel info from the metadata. " + f"Please check the corresponding {Constant.PROFILER_METADATA}") + + df = self.filter_data_by_group_name(df) + if df.empty: + return None + + df_filtered = self.fine_filtering_src_dst_ranks(df.copy()) + if df_filtered is None: + logger.error("Got error when trying to match rank numbers!") + return None + + df_result = df_filtered.groupby([P2PPairingExport.OP_NAME, P2PPairingExport.CO_OP_NAME]).agg( + { + P2PPairingExport.START_TIME: "first", + P2PPairingExport.SRC_RANK: "first", + self.COL_NAME_OP_DST_RANK: "first" + } + ).reset_index() + + df_result = self.generate_p2p_connection_index(df_result) + + df_result = df_result[[P2PPairingExport.CO_OP_NAME, self.COL_NAME_P2P_CONNECTION_ID]] + + self.update_connection_info_to_table(df_result, profiler_db_path) + return data_map.get(Constant.RANK_ID) diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7101187a2c2619f3b1c20dded14b433950b4c662 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/pp_chart.py b/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/pp_chart.py new file mode 100644 index 0000000000000000000000000000000000000000..358c645492274c12cd7d859914e71feabe0a9895 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/pp_chart/pp_chart.py @@ -0,0 +1,292 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import defaultdict +import json +import os +import pandas as pd + +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_common.database_service import DatabaseService +from msprof_analyze.prof_exports.pp_chart_export import PPChartExport + +logger = get_logger() + + +def filter_non_overlapping(df: pd.DataFrame) -> pd.DataFrame: + if df.empty: + return df + result = [] + last_end = -1 + for _, row in df.iterrows(): + if row['startNs'] >= last_end: + result.append(row) + last_end = row['endNs'] + return pd.DataFrame(result) + + +class PPChart(BaseRecipeAnalysis): + FORWARD_STAGE_0 = "FORWARD_STAGE_0" + FORWARD_STAGE_1 = "FORWARD_STAGE_1" # 表示一个microbatch在同一张卡的两个stage + BACKWARD_STAGE_0 = "BACKWARD_STAGE_0" + BACKWARD_STAGE_1 = "BACKWARD_STAGE_1" + STEP_TASK_INFO = "StepTaskInfo" + LOGITS = "logits" + + def __init__(self, params): + super().__init__(params) + logger.info("PPChart init.") + self.micro_batch_id_dict = defaultdict(list) + self.pp_stage_mstx_num = defaultdict(int) + self.micro_batch_num = None + self.pp_type = None + self.distributed_args = self.load_distributed_args() + self.load_pp_info() + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + @staticmethod + def generate_dualpipev_schedule(pp_size, num_microbatches): + num_microbatches = num_microbatches * 2 + num_warmup_stages = [0] * pp_size + num_interleaved_forward_stages = [0] * pp_size + num_1b1w1f_stages = [0] * pp_size + num_overlap_stages = [0] * pp_size + num_1b1overlap_stages = [0] * pp_size + num_interleaved_backward_stages = [0] * pp_size + num_cooldown_stages = [0] * pp_size + pp_size *= 2 + for i in range(pp_size // 2): + num_warmup_stages[i] = pp_size - 2 - i * 2 + num_interleaved_forward_stages[i] = i + 1 # 每个单位是一组1f1f + num_1b1w1f_stages[i] = pp_size // 2 - i - 1 + num_overlap_stages[i] = num_microbatches - pp_size * 2 + i * 2 + 2 + num_1b1overlap_stages[i] = (pp_size // 2 - i - 1) * 2 + num_interleaved_backward_stages[i] = i + 1 + num_cooldown_stages[i] = [i + 1, pp_size - 2 * i - 2, i + 1] + schedule_all_stages = { + 'warmup': num_warmup_stages, + 'interleaved_forward': num_interleaved_forward_stages, + '1b1w1f': num_1b1w1f_stages, + 'overlap': num_overlap_stages, + '1b1overlap': num_1b1overlap_stages, + 'interleaved_backward': num_interleaved_backward_stages, + 'cooldown': num_cooldown_stages + } + return schedule_all_stages + + def calculate_micro_batch_id_for_dualpipev(self): + pp_size = self.distributed_args.get(self.PP_SIZE) + if self.micro_batch_num is None or self.micro_batch_num < pp_size * 2: + logger.error("The micro_batch_num is less than pp_size * 2, please set it to a larger value.") + return + schedule_all_stages = self.generate_dualpipev_schedule(pp_size, self.micro_batch_num) + cur_micro_batch_id_dict = defaultdict(dict) + flag = defaultdict(bool) # 标识最后一个阶段是BACKWARD_STAGE_0开头还是BACKWARD_STAGE_1开头 + for stage_name, stage_num in schedule_all_stages.items(): + for i, num in enumerate(stage_num): + last_forward_id_0 = cur_micro_batch_id_dict[i].setdefault(self.FORWARD_STAGE_0, -1) + last_forward_id_1 = cur_micro_batch_id_dict[i].setdefault(self.FORWARD_STAGE_1, -1) + last_backward_id_0 = cur_micro_batch_id_dict[i].setdefault(self.BACKWARD_STAGE_0, -1) + last_backward_id_1 = cur_micro_batch_id_dict[i].setdefault(self.BACKWARD_STAGE_1, -1) + if stage_name == "warmup": + self.micro_batch_id_dict[i].extend([[str(x), 0] for x in range(num)]) + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_0] = num - 1 + self.pp_stage_mstx_num[i] += num + elif stage_name == "interleaved_forward": + for j in range(num): + self.micro_batch_id_dict[i].append([str(last_forward_id_0 + j + 1), 1]) + self.micro_batch_id_dict[i].append([str(self.micro_batch_num + j), 1]) + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_0] += 1 + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_1] = self.micro_batch_num + num - 1 + self.pp_stage_mstx_num[i] += num * 2 + elif stage_name == "1b1w1f": + for j in range(num): + if i == 0: + self.micro_batch_id_dict[i].append([self.LOGITS, 2]) + self.pp_stage_mstx_num[i] += 1 + self.micro_batch_id_dict[i].append([f"{self.micro_batch_num + j}b", 2]) + self.micro_batch_id_dict[i].append([f"{self.micro_batch_num + j}w", 2]) + self.micro_batch_id_dict[i].append([str(last_forward_id_1 + j + 1), 2]) + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_1] += 1 + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_1] = self.micro_batch_num + num - 1 + self.pp_stage_mstx_num[i] += num * 3 + elif stage_name == "overlap": + for j in range(num // 2): + if i == 0: + self.micro_batch_id_dict[i].append([self.LOGITS, 3]) + self.pp_stage_mstx_num[i] += 1 + if i == pp_size - 1 and j == 0: + self.micro_batch_id_dict[i].append([f"{last_forward_id_0 + j + 1}F", 3]) + self.micro_batch_id_dict[i].append([f"{last_backward_id_1 + j + 1}B", 3]) + self.pp_stage_mstx_num[i] += 1 + else: + self.micro_batch_id_dict[i].append( + [f"{last_forward_id_0 + j + 1}F+{last_backward_id_1 + j + 1}B", 3]) + self.micro_batch_id_dict[i].append( + [f"{last_forward_id_1 + j + 1}F+{last_backward_id_0 + j + 1}B", 3]) + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_0] += 1 + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_1] += 1 + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_0] += 1 + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_1] += 1 + self.pp_stage_mstx_num[i] += num + elif stage_name == "1b1overlap": + for j in range(num // 2): + if i == 0: + self.micro_batch_id_dict[i].append([self.LOGITS, 4]) + self.pp_stage_mstx_num[i] += 1 + self.micro_batch_id_dict[i].append([f"{last_backward_id_1 + j + 1}B", 4]) + self.micro_batch_id_dict[i].append( + [f"{last_forward_id_1 + j + 1}F+{last_backward_id_0 + j + 1}B", 4]) + cur_micro_batch_id_dict[i][self.FORWARD_STAGE_1] += 1 + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_0] += 1 + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_1] += 1 + self.pp_stage_mstx_num[i] += num + elif stage_name == "interleaved_backward": + for j in range(num): + if j % 2 == 0: + if i == 0: + self.micro_batch_id_dict[i].append([self.LOGITS, 5]) + self.pp_stage_mstx_num[i] += 1 + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_1 + j // 2 + 1}B"), 5]) + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_1] += 1 + flag[i] = True + else: + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_0 + j // 2 + 1}B"), 5]) + cur_micro_batch_id_dict[i][self.BACKWARD_STAGE_0] += 1 + flag[i] = False + self.pp_stage_mstx_num[i] += num + elif stage_name == "cooldown": + self.pp_stage_mstx_num[i] += pp_size # 不开dw分离 + while last_backward_id_0 < self.micro_batch_num - 1 or \ + last_backward_id_1 < self.micro_batch_num * 2 - 1: + if flag[i]: + if last_backward_id_0 < self.micro_batch_num - 1: + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_0 + 1}B"), 6]) + last_backward_id_0 += 1 + if last_backward_id_1 < self.micro_batch_num * 2 - 1: + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_1 + 1}B"), 6]) + last_backward_id_1 += 1 + else: + if last_backward_id_1 < self.micro_batch_num * 2 - 1: + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_1 + 1}B"), 6]) + last_backward_id_1 += 1 + if last_backward_id_0 < self.micro_batch_num - 1: + self.micro_batch_id_dict[i].append([str(f"{last_backward_id_0 + 1}B"), 6]) + last_backward_id_0 += 1 + + def load_pp_info(self): + rank_id = list(self._data_map.keys())[0] + profiler_db_path = self._data_map[rank_id] + db_path = os.path.join(profiler_db_path, Constant.SINGLE_OUTPUT, f"ascend_pytorch_profiler_{rank_id}.db") + if not os.path.exists(profiler_db_path): + logger.error(f"Db_file: {db_path} not exist.") + return + try: + service = DatabaseService(db_path) + service.add_table_for_query("META_DATA", ["name", "value"]) + df = service.query_data().get("META_DATA", None) + if df is None: + logger.warning(f"There is no META_DATA in {db_path}.") + return + pp_info = df.loc[df["name"] == "pp_info", "value"] + if pp_info.empty: + logger.warning("pp_info not in profiling files, please input manually.") + return + else: + pp_info = json.loads(pp_info.values[0]) + self.micro_batch_num = pp_info.get("microbatch_num") + self.pp_type = pp_info.get("pp_type").lower() + except Exception as err: + logger.error(err) + logger.error("pp_info not in profiling files, please input manually.") + + def mapper_func_for_dualpipev(self, context): + return context.wait( + context.map( + self._mapper_func_for_dualpipev, + self._get_rank_db(), + analysis_class=self._recipe_name, + rank_pp_stage_map=self.map_rank_pp_stage(self.distributed_args), + pp_stage_mstx_num=self.pp_stage_mstx_num, + micro_batch_id_dict=self.micro_batch_id_dict + ) + ) + + def run(self, context): + if self.distributed_args is None: + logger.warning("The parallel strategy is lost.") + if self.pp_type == "dualpipev": + self.calculate_micro_batch_id_for_dualpipev() + res = self.mapper_func_for_dualpipev(context) # 忽略返回值 + else: + res = self.mapper_func(context) # 忽略返回值 + if res: + logger.info("PPChart finished.") + + def _mapper_func_for_dualpipev(self, data_map, analysis_class, rank_pp_stage_map, pp_stage_mstx_num, + micro_batch_id_dict): + """ + rank_pp_stage_map: 记录rank与pp_stage的映射,可以知道某个rank属于哪个pp_stage + pp_stage_mstx_num: 每个pp_stage预期的前反向的总打点数 + micro_batch_id_dict: 每个pp_stage的microbatch_id信息以及属于dualpipeV的哪个阶段,示例如下 + { + 0: [ ["0", 0], [ "2", 0], ..., ["7F+13B", 3], ...] + .... + } + """ + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + df = PPChartExport(profiler_db_path, analysis_class).read_export_db() + if df is None or df.empty: + logger.warning(f"There is no mstx data in {profiler_db_path}.") + return + rank_id = data_map.get(Constant.RANK_ID) + pp_stage = rank_pp_stage_map.get(rank_id) + if pp_stage is None: + logger.error(f"The rank {rank_id} does not belong to any PP stage.") + return + df = filter_non_overlapping(df) + df["name"] = "" + df["type"] = 0 + + def match_mstx_name(group): + if len(group) != pp_stage_mstx_num[pp_stage]: + logger.error(f"The number of mstx_count should be {pp_stage_mstx_num[pp_stage]}, not {len(group)}.") + return group + for idx, (i, row) in enumerate(group.iterrows()): + micro_batch_id_info = micro_batch_id_dict[pp_stage][idx] + group.at[i, "name"] = micro_batch_id_info[0] + group.at[i, "type"] = micro_batch_id_info[1] + return group + df = df.groupby("step").apply(match_mstx_name) + result = df[["name", "startNs", "endNs", "type"]] + self.dump_data(data=result, file_name="", table_name=self.STEP_TASK_INFO, index=False, + custom_db_path=data_map.get(Constant.PROFILER_DB_PATH)) + + def _mapper_func(self, data_map, analysis_class): + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + df = PPChartExport(profiler_db_path, analysis_class).read_export_db() + if df is None or df.empty: + logger.warning(f"There is no mstx data in {profiler_db_path}.") + return + df["name"] = df["msg"].apply(lambda x: "FP" if "forward" in x.lower() else "BP") + df['type'] = df['name'].map({'FP': 0, 'BP': 1}) + result = df[["name", "startNs", "endNs", "type"]] + self.dump_data(data=result, file_name="", table_name=self.STEP_TASK_INFO, index=False, + custom_db_path=data_map.get(Constant.PROFILER_DB_PATH)) \ No newline at end of file diff --git a/profiler/msprof_analyze/docs/pp_chart.md b/profiler/msprof_analyze/docs/pp_chart.md new file mode 100644 index 0000000000000000000000000000000000000000..bbbbd56121f9d8f011b3242fff4c39ba2da6ce77 --- /dev/null +++ b/profiler/msprof_analyze/docs/pp_chart.md @@ -0,0 +1,115 @@ +# pp流水图的采集、分析和显示 + +## 简介 +pp流水图指的是将实际pp域内的流水排布进行可视化呈现,让用户更直观地看到各个rank的前反向、通信的时间分布。 +* 重点介绍怎么采集、怎么用mstt工具处理以及MindStudio Insight里面的呈现效果。 + +## 操作指导 + +用户想看到pp流水图,需要按照以下三个步骤操作。 + +### 1. profiling数据采集 + +MindSpeed-LLM等框架里面已经集成了Ascend Pytorch Profiler采集接口,配置相关参数即可采集实际运行过程中的profiling数据。 + +但是前反向并不能直接呈现出来,可以使用msproftx打点接口,在代码里面的前反向相关函数前后打点,就可以在timeline上的Ascend HardWare层直观地看到前反向的信息。 + +tips:如果用户只关注pp流水图,那么可以设置将profiler_level设置为最低等级Level_none。Level1及以上级别可以除了可以看到前反向、通信,还可以看到send 和 recv的连线。 + + +**重要说明**: +* 采集数据时,需要将profiling数据导出格式设置为db。 +* 以下仅为打点示例,需要根据用户实际代码,准确找到前反向函数的位置,参考下面用装饰器的方式实现打点。 + +**打点:** +1. 传统pipeline(不开dualpipe),在```megatron/core/pipeline_parallel/schedules.py```里面添加如下代码(添加在```backward_step```函数定义的后面): +```python +import torch_npu +def step_wrapper(func, msg: str): + def wrapper(*args, **kwargs): + new_msg = {"name": msg} + mstx_state_step_range_id = torch_npu.npu.mstx.range_start(str(new_msg), torch_npu.npu.current_stream()) + out = func(*args, **kwargs) + if mstx_state_step_range_id is not None: + torch_npu.npu.mstx.range_end(mstx_state_step_range_id) + mstx_state_step_range_id = None + return out + return wrapper + +forward_step = step_wrapper(forward_step, "forward_step") +backward_step = step_wrapper(backward_step, "backward_step") +``` + +2. DualpipeV2,找到前反向代码,在```mindspeed/core/pipeline_parallel/dualpipev/dualpipev_schedules.py```里面添加如下代码(添加在```forward_backward_pipeline_with_cutinhalf```函数定义的前面): +```python +import torch_npu +def step_wrapper(func, msg: str): + def wrapper(*args, **kwargs): + new_msg = {"name": msg} + if msg = "forward_step_with_model_graph" and kwargs.get("extra_block_kwargs") is not None: + new_msg["name"] = "forward_backward_overlaping" + if "current_microbatch" in kwargs: + new_msg["current_microbatch"] = kwargs["current_microbatch"] + if msg == "WeightGradStore_pop" and len(WeightGradStore.cache) == 0: + mstx_state_step_range_id = None + else: + mstx_state_step_range_id = torch_npu.npu.mstx.range_start(str(new_msg), torch_npu.npu.current_stream()) + out = func(*args, **kwargs) + if mstx_state_step_range_id is not None: + torch_npu.npu.mstx.range_end(mstx_state_step_range_id) + mstx_state_step_range_id = None + return out + return wrapper + +forward_step_with_model_graph = step_wrapper(forward_step_with_model_graph, "forward_step_with_model_graph") +forward_step_no_model_graph = step_wrapper(forward_step_no_model_graph, "forward_step_no_model_graph") +backward_step_with_model_graph = step_wrapper(backward_step_with_model_graph, "backward_step_with_model_graph") +backward_step = step_wrapper(backward_step, "backward_step") +WeightGradStore.pop = step_wrapper(WeightGradStore.pop, "WeightGradStore.pop") +``` + +同时,采集profiling数据时,如果使用的是MindSpeed,未使用MindSpeed-LLM,需要在prof定义(```prof = torch_npu.profiler.profile(...```)的后面添加metadata代码: +``` +prof.add_metadata('pp_info', json.dumps( + { + 'pp_type': 'dualpipev', + 'microbatch_num': 10, + } +)) +# microbatch_num根据公式计算实际的值:microbatch_num = global_batch_size // micrio_batch_size // data_parallel_size +``` +如果使用MindSpeed-LLM,在```mindspeed-llm/training/trainning.py```中```prof.add_metadata_json('distributed_args'...```的后面添加metadata代码: +``` +prof.add_metadata('pp_info', json.dumps( + { + 'pp_type': args.schedules_method, + 'microbatch_num': args.global_batch_size // args.micrio_batch_size // args.data_parallel_size + } +)) +``` +### 2. mstt工具处理 + +**命令行使能:** +``` +msprof-analyze cluster -m pp_chart -d ./cluster_data +``` +**参数说明:** +* `-d` 第一步打点后采集到的集群数据路径 +* 其余参数:与cluster集群分析功能支持的参数一致,详见[参数列表](../cluster_analyse/README.md) + +**输出数据:** +* 存储位置:每个rank的数据里面ASCEND_PROFILER_OUTPUT/ascend_pytorch_profiler_{rank_id}.db里面新增一张表StepTaskInfo +* 数据表名:StepTaskInfo +![输出结果展示](img/clutser_time_summary.png) + +**字段说明:** + +| 字段名 | 类型 | 含义 | +| ------ | ---- | ---- | +| name | TEXT | 前反向信息,对应pp流水图上色块显示的名字 | +| startNs | INTEGER | 在device上开始时间 | +| endNs | INTEGER | 在device上结束时间 | +| type | INTEGER | 类型,不同类型显示不同颜色 | + +### 3. MindStudio Insight呈现 + diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py index 466368bf4ee34224cee2cfe8405b6a1f3311b8d1..f327c74fab92a99b98e623137554220a9d8c74af 100644 --- a/profiler/msprof_analyze/prof_common/constant.py +++ b/profiler/msprof_analyze/prof_common/constant.py @@ -452,6 +452,8 @@ class Constant(object): UINT32_BITS = 32 UINT32_MASK = 0xffffffff + INVALID_RANK_NUM = 4294967295 + # slow rank MAX_DIXON_NUM = 100 DIXON_THRESHOLD_1 = 7 @@ -484,3 +486,24 @@ class Constant(object): TABLE_MEMORY_RECORD = "MEMORY_RECORD" TABLE_STEP_TIME = "STEP_TIME" + # communication task type + NOTIFY_RECORD = "Notify_Record" + NOTIFY_WAIT = "Notify_Wait" + + +class ProfilerTableConstant: + + # COMMUNICATION OP + OP_ID = "opId" + OP_NAME = "opName" + START_NS = "startNS" + END_NS = "endNS" + CONNECTION_ID = "connectionId" + GROUP_NAME = "groupName" + RELAY = "relay" + RETRY = "retry" + DATA_TYPE = "dataType" + ALG_TYPE = "algType" + COUNT = "count" + OP_TYPE = "opType" + WAIT_NS = "waitNS" \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_common/db_manager.py b/profiler/msprof_analyze/prof_common/db_manager.py index 1c95e2eec123cf7378c81f5cf355a1ee7bb3bf2f..ba757e6ed8397e9b07a295906ed675946df96665 100644 --- a/profiler/msprof_analyze/prof_common/db_manager.py +++ b/profiler/msprof_analyze/prof_common/db_manager.py @@ -15,7 +15,6 @@ import os import sqlite3 -from typing import List from msprof_analyze.cluster_analyse.common_func.empty_class import EmptyClass from msprof_analyze.cluster_analyse.common_func.tables_config import TablesConfig @@ -190,17 +189,6 @@ class DBManager: cls.destroy_db_connect(conn, curs) return res - @classmethod - def get_table_columns_name(cls, curs: any, table: any) -> List[str]: - sql = f"PRAGMA table_info({table})" - try: - curs.execute(sql) - columns = curs.fetchall() - except sqlite3.Error as err: - logger.error(err) - return [] - return [column[1] for column in columns] - @classmethod def fetch_all_data(cls: any, curs: any, sql: str, param: tuple = None, is_dict: bool = True) -> list: """ @@ -261,6 +249,21 @@ class DBManager: cls.insert_data_into_table(conn, table_name, data) cls.destroy_db_connect(conn, curs) + @classmethod + def check_columns_exist(cls, curs: any, table_name: str, columns: set) -> any: + """ + check columns exist in table, return empty set if none of them exist, else return the set of existing columns + """ + if not isinstance(curs, sqlite3.Cursor): + return None + try: + curs.execute(f"PRAGMA table_info({table_name})") + table_columns = {col[1] for col in curs.fetchall()} + return columns & table_columns + except sqlite3.Error as err: + logger.error(err) + return None + class CustomizedDictFactory: @staticmethod diff --git a/profiler/msprof_analyze/prof_exports/p2p_pairing_export.py b/profiler/msprof_analyze/prof_exports/p2p_pairing_export.py new file mode 100644 index 0000000000000000000000000000000000000000..effbcb55a0c35dafeef4813942135e256f60b194 --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/p2p_pairing_export.py @@ -0,0 +1,70 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from string import Template +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + + +QUERY = Template(""" +SELECT + co.opName AS "$opNameId", + siii.value AS "$opName", + co.startNs AS "$startTime", + co.endNs AS "$endTime", + rdm.rankId AS "$globalRank", + cti.srcRank AS "$srcRank", + cti.dstRank AS "$dstRank", + siiii.value AS "$taskType", + sii.value AS "$coGroupName", + si.value AS "$ctiGroupName" +FROM + COMMUNICATION_TASK_INFO cti + LEFT JOIN COMMUNICATION_OP co on cti.opId = co.opId + CROSS JOIN RANK_DEVICE_MAP rdm + JOIN STRING_IDS si on cti.groupName = si.id + JOIN STRING_IDS sii on co.groupName = sii.id + JOIN STRING_IDS siii on co.opName = siii.id + JOIN STRING_IDS siiii on cti.taskType = siiii.id +""") + + +class P2PPairingExport(BaseStatsExport): + + CO_OP_NAME = "opNameId" + OP_NAME = "opName" + START_TIME = "startTime" + END_TIME = "endTime" + GLOBAL_RANK = "globalRank" + SRC_RANK = "srcRank" + DST_RANK = "dstRank" + TASK_TYPE = "taskType" + CO_GROUP_NAME = "coGroupName" + CTI_GROUP_NAME = "ctiGroupName" + + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name) + self._query = QUERY.safe_substitute( + opNameId=self.CO_OP_NAME, + opName=self.OP_NAME, + startTime=self.START_TIME, + endTime=self.END_TIME, + globalRank=self.GLOBAL_RANK, + srcRank=self.SRC_RANK, + dstRank=self.DST_RANK, + taskType=self.TASK_TYPE, + coGroupName=self.CO_GROUP_NAME, + ctiGroupName=self.CTI_GROUP_NAME + ) diff --git a/profiler/msprof_analyze/prof_exports/pp_chart_export.py b/profiler/msprof_analyze/prof_exports/pp_chart_export.py new file mode 100644 index 0000000000000000000000000000000000000000..40103f04ccf5826832076ab53d9255c0484e5fff --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/pp_chart_export.py @@ -0,0 +1,57 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprof_analyze.prof_common.db_manager import DBManager +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + + +class PPChartExport(BaseStatsExport): + QUERY = """ + SELECT + {} + MSG_IDS.value AS msg, + TASK.startNs, + TASK.endNs + FROM + MSTX_EVENTS + JOIN + TASK ON MSTX_EVENTS.connectionId = TASK.connectionId + JOIN + STRING_IDS AS MSG_IDS ON MSTX_EVENTS.message = MSG_IDS.id + {} + WHERE + msg LIKE '%forward%' + OR msg LIKE '%backward%' + OR msg LIKE '%WeightGradStore_pop%' + ORDER BY + TASK.startNs + """ + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name) + self._query = self._build_query(db_path) + + def _build_query(self, db_path): + str1 = "0 AS step," + str2 = "" + if DBManager.check_tables_in_db(db_path, Constant.TABLE_STEP_TIME): + str1 = "step_time.id AS step," + str2 = """ + LEFT JOIN STEP_TIME step_time + ON TASK.startNs >= step_time.startNs + AND TASK.endNs <= step_time.endNs + """ + return self.QUERY.format(str1, str2) diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_pp_chart.py b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_pp_chart.py new file mode 100644 index 0000000000000000000000000000000000000000..5a21c032bed2d57606a0cd2e739d0510231f8236 --- /dev/null +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_pp_chart.py @@ -0,0 +1,77 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +from unittest import mock +import pandas as pd + +from msprof_analyze.cluster_analyse.common_func.context import Context +from msprof_analyze.cluster_analyse.recipes.pp_chart.pp_chart import PPChart + +NAMESPACE = "msprof_analyze.cluster_analyse.recipes" + + +class TestClusterTimeCompareSummary(unittest.TestCase): + def test_calculate_micro_batch_id_for_dualpipev_when_pp_size_4_and_num_microbatches_10(self): + expected_pp_stage_mstx_num = { + 0: 44, + 1: 32, + 2: 30, + 3: 29 + } + expected_micro_batch_id_dict = { + 0: [['0', 0], ['1', 0], ['2', 0], ['3', 0], ['4', 0], ['5', 0], ['6', 1], ['10', 1], ['logits', 2], + ['10b', 2], ['10w', 2], ['11', 2], ['logits', 2], ['11b', 2], ['11w', 2], ['12', 2], ['logits', 2], + ['12b', 2], ['12w', 2], ['13', 2], ['logits', 3], ['7F+13B', 3], ['14F+0B', 3], ['logits', 3], + ['8F+14B', 3], ['15F+1B', 3], ['logits', 3], ['9F+15B', 3], ['16F+2B', 3], ['logits', 4], ['16B', 4], + ['17F+3B', 4], ['logits', 4], ['17B', 4], ['18F+4B', 4], ['logits', 4], ['18B', 4], ['19F+5B', 4], + ['logits', 5], ['19B', 5], ['6B', 6], ['7B', 6], ['8B', 6], ['9B', 6]], + 1: [['0', 0], ['1', 0], ['2', 0], ['3', 0], ['4', 1], ['10', 1], ['5', 1], ['11', 1], ['10b', 2], + ['10w', 2], ['12', 2], ['11b', 2], ['11w', 2], ['13', 2], ['6F+12B', 3], ['14F+0B', 3], ['7F+13B', 3], + ['15F+1B', 3], ['8F+14B', 3], ['16F+2B', 3], ['9F+15B', 3], ['17F+3B', 3], ['16B', 4], ['18F+4B', 4], + ['17B', 4], ['19F+5B', 4], ['18B', 5], ['6B', 5], ['19B', 6], ['7B', 6], ['8B', 6], ['9B', 6]], + 2: [['0', 0], ['1', 0], ['2', 1], ['10', 1], ['3', 1], ['11', 1], ['4', 1], ['12', 1], ['10b', 2], + ['10w', 2], ['13', 2], ['5F+11B', 3], ['14F+0B', 3], ['6F+12B', 3], ['15F+1B', 3], ['7F+13B', 3], + ['16F+2B', 3], ['8F+14B', 3], ['17F+3B', 3], ['9F+15B', 3], ['18F+4B', 3], ['16B', 4], ['19F+5B', 4], + ['17B', 5], ['6B', 5], ['18B', 5], ['7B', 6], ['19B', 6], ['8B', 6], ['9B', 6]], + 3: [['0', 1], ['10', 1], ['1', 1], ['11', 1], ['2', 1], ['12', 1], ['3', 1], ['13', 1], ['4F', 3], + ['10B', 3], ['14F+0B', 3], ['5F+11B', 3], ['15F+1B', 3], ['6F+12B', 3], ['16F+2B', 3], ['7F+13B', 3], + ['17F+3B', 3], ['8F+14B', 3], ['18F+4B', 3], ['9F+15B', 3], ['19F+5B', 3], ['16B', 5], ['6B', 5], + ['17B', 5], ['7B', 5], ['18B', 6], ['8B', 6], ['19B', 6], ['9B', 6]] + } + with (mock.patch(NAMESPACE + ".base_recipe_analysis.BaseRecipeAnalysis.load_distributed_args", + return_value={PPChart.PP_SIZE: 4}), + mock.patch(NAMESPACE + ".pp_chart.pp_chart.PPChart.load_pp_info")): + pp_chart_instance = PPChart({}) + pp_chart_instance.micro_batch_num = 10 + pp_chart_instance.calculate_micro_batch_id_for_dualpipev() + self.assertEqual(pp_chart_instance.pp_stage_mstx_num, expected_pp_stage_mstx_num) + self.assertEqual(pp_chart_instance.micro_batch_id_dict, expected_micro_batch_id_dict) + + def test_pp_chart_should_generate_table_when_pp_info_not_existed(self): + df = pd.DataFrame({"step": [0, 0], "msg": ["forward_step", "backward_step"], "startNs": [1, 4], + "endNs": [2, 5]}) + with mock.patch(NAMESPACE + ".base_recipe_analysis.BaseRecipeAnalysis.load_distributed_args", + return_value={}), \ + mock.patch(NAMESPACE + ".base_recipe_analysis.BaseRecipeAnalysis.dump_data"), \ + mock.patch(NAMESPACE + ".pp_chart.pp_chart.PPChart.load_pp_info"), \ + mock.patch("msprof_analyze.prof_exports.base_stats_export.BaseStatsExport.read_export_db", + return_value=df): + with Context.create_context() as context: + pp_chart_instance = PPChart({}) + pp_chart_instance.micro_batch_num = 10 + pp_chart_instance.run(context) + self.assertFalse(pp_chart_instance.micro_batch_id_dict)