diff --git a/profiler/msprof_analyze/cluster_analyse/common_func/context.py b/profiler/msprof_analyze/cluster_analyse/common_func/context.py index cde351508c0e814867d224b7ce5c454e0813a71b..6735783a83148aa37025ce0fa95868b86773ec11 100644 --- a/profiler/msprof_analyze/cluster_analyse/common_func/context.py +++ b/profiler/msprof_analyze/cluster_analyse/common_func/context.py @@ -16,6 +16,7 @@ import os from functools import partial from concurrent import futures +from collections import defaultdict from msprof_analyze.prof_common.constant import Constant from msprof_analyze.prof_common.logger import get_logger @@ -68,6 +69,7 @@ class ConcurrentContext(Context): super().__init__() self._custom = executor is None self._executor = executor or futures.ProcessPoolExecutor(max_workers=os.cpu_count()) + self.future_dict = defaultdict(list) def __enter__(self): if self._executor is None: @@ -93,3 +95,11 @@ class ConcurrentContext(Context): def wait(self, waitable): return waitable + + def submit(self, name, func, *args, **kwargs): + self.future_dict[name].append(self._executor.submit(func, *args, **kwargs)) + + def wait_all_futures(self): + for _, future_list in self.future_dict.items(): + for future in future_list: + future.result() \ No newline at end of file diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py index a6ad06427eff6b9bd66cd05f687c631c7bed0eeb..0289b1747a642f40220c43784ef2231d4c06b0bf 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py @@ -218,6 +218,7 @@ class BaseRecipeAnalysis(ABC): step_time = DBManager.fetch_all_data(cursor, sql) except Exception as err: logger.error(err) + return step_range finally: DBManager.destroy_db_connect(conn, cursor) diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_compare_summary/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_compare_summary/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_compare_summary/cluster_time_compare_summary.py b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_compare_summary/cluster_time_compare_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..52a733992a64a33b9968c525960e251682684108 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_compare_summary/cluster_time_compare_summary.py @@ -0,0 +1,147 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pandas as pd + +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.database_service import DatabaseService +from msprof_analyze.prof_common.db_manager import DBManager +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_common.path_manager import PathManager + +logger = get_logger() + + +class ClusterTimeCompareSummary(BaseRecipeAnalysis): + BP = "bp" # 被对比的路径参数 + TABLE_CLUSTER_TIME_COMPARE_SUMMARY = "ClusterTimeCompareSummary" + CLUSTER_TIME_SUMMARY_COLUMNS = [ + "rank", + "step", + "stepTime", + "computation", + "communicationNotOverlapComputation", + "communicationOverlapComputation", + "communication", + "free", + "communicationWaitStageTime", + "communicationTransmitStageTime", + "memory", + "memoryNotOverlapComputationCommunication", + ] + + def __init__(self, params): + super().__init__(params) + self.db_path = os.path.join(self._collection_dir, Constant.CLUSTER_ANALYSIS_OUTPUT, + Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + self.base_db_path = os.path.join(self._extra_args.get(self.BP, ""), Constant.CLUSTER_ANALYSIS_OUTPUT, + Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + self.compare_result = pd.DataFrame() + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + @classmethod + def add_parser_argument(cls, parser): + BaseRecipeAnalysis.add_parser_argument(parser) + parser.add_argument('--bp', type=PathManager.expanduser_for_argumentparser, default="", + help="base profiling data path") + + def run(self, context=None): + logger.info("ClusterTimeCompareSummary init.") + if not self.check_params_is_valid(): + logger.warning(f"Invalid params, skip ClusterTimeCompareSummary") + return + self.get_compare_data() + self.save_db() + + def check_params_is_valid(self) -> bool: + base_path = self._extra_args.get(self.BP, "") + if not base_path: + logger.error("Must specify the --bp parameter.") + return False + if self._export_type != Constant.DB: + logger.error("For cluster_time_compare_summary, the export_type parameter only supports db.") + return False + try: + PathManager.check_input_directory_path(base_path) # 校验目录 + except RuntimeError: + logger.error(f"{base_path} is not valid.") + return False + if not DBManager.check_tables_in_db(self.db_path, Constant.TABLE_CLUSTER_TIME_SUMMARY): + logger.error(f"{Constant.TABLE_CLUSTER_TIME_SUMMARY} in {self.db_path} does not exist.") + return False + if not DBManager.check_tables_in_db(self.base_db_path, Constant.TABLE_CLUSTER_TIME_SUMMARY): + logger.error(f"{Constant.TABLE_CLUSTER_TIME_SUMMARY} in {self.base_db_path} does not exist.") + return False + return True + + def get_compare_data(self): + cluster_time_summary_df = self._query_cluster_time_summary(self.db_path) + base_cluster_time_summary_df = self._query_cluster_time_summary(self.base_db_path) + if cluster_time_summary_df.empty or base_cluster_time_summary_df.empty: + return + # filter by step_id + if self._step_id != Constant.VOID_STEP: + step_ids = cluster_time_summary_df['step'].unique() + base_step_ids = base_cluster_time_summary_df['step'].unique() + if self._step_id in step_ids and self._step_id in base_step_ids: + cluster_time_summary_df = cluster_time_summary_df[cluster_time_summary_df['step'] == self._step_id] + base_cluster_time_summary_df = base_cluster_time_summary_df[ + base_cluster_time_summary_df['step'] == self._step_id] + else: + logger.error(f"Invalid step_id, not coexisting in {step_ids} or {base_step_ids}") + return + # merge and compare + index_cols = ["rank", "step"] + current_df = cluster_time_summary_df.set_index(index_cols) + base_df = base_cluster_time_summary_df.set_index(index_cols).add_suffix("Base") + merged_df = current_df.join(base_df).reset_index() + columns_order = index_cols + for col in self.CLUSTER_TIME_SUMMARY_COLUMNS: + if col in index_cols: + continue + base_col = f"{col}Base" + diff_col = f"{col}Diff" + if base_col not in merged_df or col not in merged_df: + logger.warning(f"Column {col} missing in clusterTimeSummary tables.") + continue + merged_df[diff_col] = merged_df[col] - merged_df[base_col] + columns_order.extend([col, base_col, diff_col]) + self.compare_result = merged_df[columns_order].dropna() + if len(self.compare_result) < len(current_df): + logger.warning(f"Dropped {len(current_df) - len(self.compare_result)} rows due to unmatched rank-step") + + def save_db(self): + if self.compare_result.empty: + logger.warning(f"No valid compare data, skip save_db for ClusterTimeCompareSummary") + return + self.dump_data(self.compare_result, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER, + self.TABLE_CLUSTER_TIME_COMPARE_SUMMARY, index=False) + + def _query_cluster_time_summary(self, db_path): + database_service_for_db = DatabaseService(db_path, {}) + database_service_for_db.add_table_for_query(Constant.TABLE_CLUSTER_TIME_SUMMARY, + self.CLUSTER_TIME_SUMMARY_COLUMNS) + result_dict = database_service_for_db.query_data() + df = result_dict.get(Constant.TABLE_CLUSTER_TIME_SUMMARY) + if df is None or df.empty: + logger.warning(f"There is no {Constant.TABLE_CLUSTER_TIME_SUMMARY} data in {db_path}.") + return pd.DataFrame() + return df diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_summary/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_summary/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_summary/cluster_time_summary.py b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_summary/cluster_time_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..74366cc43a9f37ddc3ab752010133398622af042 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/cluster_time_summary/cluster_time_summary.py @@ -0,0 +1,305 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import pandas as pd + +from msprof_analyze.cluster_analyse.common_func.context import ConcurrentContext +from msprof_analyze.cluster_analyse.common_func.table_constant import TableConstant +from msprof_analyze.cluster_analyse.common_func.utils import double_hash +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.cluster_analyse.recipes.communication_group_map.communication_group_map import CommunicationGroupMap +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_exports.cluster_time_summary_export import CommunicationOpWithStepExport +from msprof_analyze.prof_exports.cluster_time_summary_export import MemoryAndDispatchTimeExport +from msprof_analyze.prof_common.database_service import DatabaseService +from msprof_analyze.prof_common.db_manager import DBManager + +logger = get_logger() + + +class OverlapInfo: + def __init__(self, start, end, overlap_type): + self.start = start + self.end = end + self.type = overlap_type + + +class ClusterTimeSummary(BaseRecipeAnalysis): + COMPUTING_TYPE = 0 + COMMUNICATION_TYPE = 1 + MEMORY_TYPE = 4 + STEP_TIME = "step_time" + STEP_TRACE = "step_trace" + COMMUNICATION = "communication" + MEMORY = "memory" + + def __init__(self, params): + super().__init__(params) + self.params = params + self.db_paths = self._get_rank_db() + self.stats_data = None + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + @classmethod + def get_memory_not_overlap(cls, df: pd.DataFrame): + memory_not_overlap_time = 0 # free的时间段里面memory的总时间(异步拷贝) + cur_block = OverlapInfo(df.iloc[0]["start"], df.iloc[0]["start"], -1) + for time_info in df.itertuples(): + if cur_block.type == cls.MEMORY_TYPE: + tmp_start = cur_block.start + tmp_end = cur_block.end if time_info.start > cur_block.end else time_info.start + if tmp_start < tmp_end: + memory_not_overlap_time += tmp_end - tmp_start + if time_info.start > cur_block.end: + cur_block.end = time_info.end + cur_block.type = time_info.type + cur_block.start = time_info.start + else: + cur_block.type = time_info.type if time_info.end > cur_block.end else cur_block.type + cur_block.start = cur_block.end if time_info.end > cur_block.end else time_info.end + cur_block.end = time_info.end if time_info.end > cur_block.end else cur_block.end + # 此处为了添加最后一块数据 + if cur_block.type == cls.MEMORY_TYPE: + memory_not_overlap_time += cur_block.end - cur_block.start + return memory_not_overlap_time / Constant.TIME_UNIT_SCALE + + @classmethod + def calculate_memory_time(cls, df: pd.DataFrame) -> pd.DataFrame: + filtered_df = df[df['type'].isin([cls.MEMORY_TYPE])].copy() + filtered_df['memory'] = filtered_df['end'] - filtered_df['start'] + result = filtered_df.groupby(['step'])['memory'].sum().reset_index() + result['memory'] = result['memory'] / Constant.TIME_UNIT_SCALE + return result + + def calculate_step_time(self, data_map, analysis_class): + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + rank_id = data_map.get(Constant.RANK_ID) + data_service = DatabaseService(profiler_db_path, {}) + data_service.add_table_for_query(Constant.TABLE_STEP_TIME, ["id", "startNs", "endNs"]) + df = data_service.query_data().get(Constant.TABLE_STEP_TIME) + if df is None or df.empty: + logger.warning(f"There is no STEP_TIME data in {profiler_db_path}.") + return None + df["stepTime"] = (df["endNs"] - df["startNs"]) / Constant.TIME_UNIT_SCALE + result_df = df[["id", "stepTime"]].rename(columns={"id": "step"}) + result_df.insert(0, "rank", rank_id) + return result_df + + def calculate_step_trace_time(self, data_map, analysis_class): + analysis_db_path = data_map.get(Constant.ANALYSIS_DB_PATH) + rank_id = data_map.get(Constant.RANK_ID) + data_service = DatabaseService(analysis_db_path, {}) + data_service.add_table_for_query(Constant.TABLE_STEP_TRACE, ["step", "computing", + "communication_not_overlapped", "overlapped", + "communication", "free", ]) + df = data_service.query_data().get(Constant.TABLE_STEP_TRACE) + if df is None or df.empty: + logger.warning(f"There is no TABLE_STEP_TRACE data in {analysis_db_path}.") + return None + df.insert(0, "rank", rank_id) + df["step"] = df["step"].astype(int) + return df + + def calculate_communication_time(self, data_map, analysis_class): + analysis_db_path = data_map.get(Constant.PROFILER_DB_PATH) + step_range = data_map.get(Constant.STEP_RANGE) + df = CommunicationOpWithStepExport(analysis_db_path, analysis_class, step_range).read_export_db() + if df is None or df.empty: + logger.warning(f"There is no communication op data in {analysis_db_path}.") + return None + return df + + def calculate_transmit_and_wait_df(self, communication_df): + transmit_and_wait_df = pd.DataFrame(columns=["rank", "step", "communicationWaitStageTime", + "communicationTransmitStageTime"]) + if communication_df.empty: + logger.warning(f"There is no communication op data in cluster, skip calculate transmit and wait time") + return transmit_and_wait_df + + # 得到group_name与rank_set的对应关系 + cluster_db_path = os.path.join(self.output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + data_service = DatabaseService(cluster_db_path, {}) + data_service.add_table_for_query(Constant.TABLE_COMMUNICATION_GROUP_MAPPING, + [TableConstant.RANK_SET, TableConstant.GROUP_ID]) + df_dict = data_service.query_data() + rank_set_df = df_dict.get(Constant.TABLE_COMMUNICATION_GROUP_MAPPING, None) + if rank_set_df is None or rank_set_df.empty: + logger.error(f"There is no {Constant.TABLE_COMMUNICATION_GROUP_MAPPING} data in {cluster_db_path}.") + return transmit_and_wait_df + + # 将"(2)"或者"(2,4,6,8)"这样从CommunicationGroupMapping的rank_set列读取出来的字符串转换为集合 + def parse_rank_set(rank_set): + try: + ranks_list = set(map(int, rank_set.strip('()').split(','))) + return ranks_list + except Exception as e: + logger.error(f"Failed to parse rank_set: {rank_set}, error: {e}") + return set() + + rank_set_df[TableConstant.RANK_SET] = rank_set_df[TableConstant.RANK_SET].apply(parse_rank_set) + # 这里两个表里面的group_name类型不一致 + group_to_ranks = dict(zip(rank_set_df[TableConstant.GROUP_ID], rank_set_df[TableConstant.RANK_SET])) + + # 自定义 filter 函数,检查一个 group 是否包含所有 required_ranks + def valid_group(group): + group_name = group.name[0] # group.name 是 (groupName, opName, step) 的元组 + required_ranks = group_to_ranks.get(group_name, set()) + actual_ranks = set(group['rank']) + return required_ranks.issubset(actual_ranks) + + communication_df["groupName"] = communication_df["groupName"].apply(double_hash) + filtered_df = (communication_df.groupby(["groupName", "opName", "step"], group_keys=False). + filter(valid_group)) + if filtered_df.empty: + logger.warning("No group satisfies the required rank set condition.") + return transmit_and_wait_df + + # 通信算子分组计算传输和等待耗时 + filtered_df["communicationTransmitStageTime"] = \ + filtered_df.groupby(["groupName", "opName", "step"])["communication_time"].transform("min") + filtered_df["communicationWaitStageTime"] = \ + filtered_df["communication_time"] - filtered_df["communicationTransmitStageTime"] + transmit_and_wait_df = filtered_df.groupby(["rank", "step"])[ + ["communicationWaitStageTime", "communicationTransmitStageTime"]].sum().reset_index() + return transmit_and_wait_df + + def calculate_memory_and_not_overlapped_time(self, data_map, analysis_class): + """ + rank step memory memoryNotOverlapComputationCommunication + 0 1 120 150 + 0 2 130 150 + """ + columns = ["rank", "step", "memory", "memoryNotOverlapComputationCommunication"] + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + rank_id = data_map.get(Constant.RANK_ID) + step_range = data_map.get(Constant.STEP_RANGE) + df = (MemoryAndDispatchTimeExport(profiler_db_path, analysis_class, step_range). + read_export_db()) + if df is None or df.empty: + logger.warning(f"Can not get memcpy task info from {profiler_db_path}.") + return pd.DataFrame(columns=columns) + + memory_df = ClusterTimeSummary.calculate_memory_time(df) + memory_not_overlap_df = (df.groupby(["step"]).apply(ClusterTimeSummary.get_memory_not_overlap). + reset_index(name="memoryNotOverlapComputationCommunication")) + result_df = pd.merge(memory_df, memory_not_overlap_df, on='step', how='inner') + result_df.insert(0, "rank", rank_id) + return result_df + + def aggregate_stats(self, context: ConcurrentContext): + def safe_concat(key: str) -> pd.DataFrame: + futures = context.future_dict.get(key, []) + df_list = [future.result() for future in futures] + valid_dfs = [df for df in df_list if df is not None and not df.empty] + return pd.concat(valid_dfs, ignore_index=True) if valid_dfs else pd.DataFrame() + + # 获取各DataFrame + step_time_df = safe_concat(ClusterTimeSummary.STEP_TIME) + step_trace_df = safe_concat(ClusterTimeSummary.STEP_TRACE) + communication_df = safe_concat(ClusterTimeSummary.COMMUNICATION) + memory_df = safe_concat(ClusterTimeSummary.MEMORY) + + # filter by step_id, 没有step_time/step_trace_time则无需进行后续拆解 + step_time_df = self._filter_by_step_id(step_time_df) + step_trace_df = self._filter_by_step_id(step_trace_df) + if step_time_df.empty or step_trace_df.empty: + logger.error(f"No valid step_time/step_trace_time in cluster data, skipping analysis") + return pd.DataFrame() + + # 通信时间细粒度拆解 + transmit_and_wait_df = self.calculate_transmit_and_wait_df(communication_df) + # 合并所有信息 + all_dfs = [step_time_df, step_trace_df, transmit_and_wait_df, memory_df] + merged_df = all_dfs[0] + for df in all_dfs[1:]: + merged_df = pd.merge(merged_df, df, on=['rank', 'step'], how='outer') + # 将所有NaN替换为0 + merged_df = merged_df.fillna(0) + # 根据 step 和 rank 列对合并后的 DataFrame 进行排序 + merged_df = merged_df.sort_values(by=['rank', 'step']) + merged_df["free"] = merged_df["free"] - merged_df["memoryNotOverlapComputationCommunication"] + # 单卡场景,通信传输时间和等待时间全部置0 + if communication_df.empty or len(communication_df['rank'].unique()) == 1: + merged_df[['communicationWaitStageTime', 'communicationTransmitStageTime']] = 0 + merged_df = merged_df.rename(columns={ + 'computing': 'computation', + 'overlapped': 'communicationOverlapComputation', + 'communication_not_overlapped': 'communicationNotOverlapComputation'}) + return merged_df.sort_values(by=['rank', 'step']) + + def mapper_func(self, context: ConcurrentContext): + for db_map in self.db_paths: + context.submit(self.STEP_TIME, self.calculate_step_time, db_map, self._recipe_name) + context.submit(self.STEP_TRACE, self.calculate_step_trace_time, db_map, self._recipe_name) + context.submit(self.COMMUNICATION, self.calculate_communication_time, + db_map, self._recipe_name) + context.submit(self.MEMORY, self.calculate_memory_and_not_overlapped_time, + db_map, self._recipe_name) + + def run(self, context: ConcurrentContext): + logger.info("ClusterTimeSummary init.") + + if self._export_type != Constant.DB: + logger.error("cluster_time_summary only supports export db.") + return + + # Prepare: 需要CommunicationGroupMap + db_path = os.path.join(self.output_path, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER) + if not DBManager.check_tables_in_db(db_path, Constant.TABLE_COMMUNICATION_GROUP_MAPPING): + if not self.run_communication_group_map_recipe(context) or \ + not DBManager.check_tables_in_db(db_path, Constant.TABLE_COMMUNICATION_GROUP_MAPPING): + logger.error(f"Create {Constant.TABLE_COMMUNICATION_GROUP_MAPPING} table failed!") + return + + # 数据处理与分析 + try: + self.mapper_func(context) + context.wait_all_futures() + self.stats_data = self.aggregate_stats(context) + self.save_db() + except Exception as err: + logger.error("Execute ClusterTimeSummary with exception: %s" % str(err)) + return + + def run_communication_group_map_recipe(self, context): + """ + Run Recipe to create CommunicationGroupMapping table + """ + logger.info(f"Run CommunicationGroupMap recipe first to get {Constant.TABLE_COMMUNICATION_GROUP_MAPPING} table") + try: + group_map_recipe = CommunicationGroupMap(self.params) + group_map_recipe.run(context) + except Exception as e: + logger.error(f"Run CommunicationGroupMap recipe failed: {e}!") + return False + return True + + def save_db(self): + if self.stats_data is None or self.stats_data.empty: + logger.warning(f"No stats data, skip save_db for ClusterTimeSummary") + return + self.dump_data(self.stats_data, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER, + Constant.TABLE_CLUSTER_TIME_SUMMARY, index=False) + + def _filter_by_step_id(self, df): + if self._step_id == Constant.VOID_STEP or 'step' not in df.columns: + return df + return df[df['step'] == self._step_id] diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py index e5265321da9697de7214cc5b18555ac8cb986c1d..466368bf4ee34224cee2cfe8405b6a1f3311b8d1 100644 --- a/profiler/msprof_analyze/prof_common/constant.py +++ b/profiler/msprof_analyze/prof_common/constant.py @@ -117,6 +117,7 @@ class Constant(object): # result files type TEXT = "text" DB = "db" + NOTEBOOK = "notebook" INVALID = "invalid" # db name @@ -138,6 +139,7 @@ class Constant(object): TABLE_CLUSTER_COMMUNICATION_MATRIX = "ClusterCommAnalyzerMatrix" TABLE_CLUSTER_COMMUNICATION_BANDWIDTH = "ClusterCommAnalyzerBandwidth" TABLE_CLUSTER_COMMUNICATION_TIME = "ClusterCommunicationTime" + TABLE_CLUSTER_TIME_SUMMARY = "ClusterTimeSummary" # data config key CONFIG = "config" @@ -395,6 +397,7 @@ class Constant(object): # Unit Conversion COMMUNICATION_B_TO_GB = 0.001 ** 3 US_TO_S = 0.001 ** 2 + TIME_UNIT_SCALE = 1000 WRITE_MODES = stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_TRUNC @@ -479,4 +482,5 @@ class Constant(object): TABLE_TASK_PMU_INFO = "TASK_PMU_INFO" TABLE_OP_MEMORY = "OP_MEMORY" TABLE_MEMORY_RECORD = "MEMORY_RECORD" + TABLE_STEP_TIME = "STEP_TIME" diff --git a/profiler/msprof_analyze/prof_exports/cluster_time_summary_export.py b/profiler/msprof_analyze/prof_exports/cluster_time_summary_export.py index 36b86301dc73ffb8e6a206f9b61bac2b159fc747..f8a1a0d6f203a5832ec35953d88d6eb8a5f27a7d 100644 --- a/profiler/msprof_analyze/prof_exports/cluster_time_summary_export.py +++ b/profiler/msprof_analyze/prof_exports/cluster_time_summary_export.py @@ -36,3 +36,83 @@ class CommunicationTimeExport(BaseStatsExport): super().__init__(db_path, recipe_name, step_range) filter_statement = "WHERE CANN_API.startNs >= ? and CANN_API.startNs <= ?" if step_range else "" self._query = self.QUERY.format(filter_statement) + + +class CommunicationOpWithStepExport(BaseStatsExport): + QUERY = """ + SELECT + RANK_DEVICE_MAP.rankId AS rank, + si_group.value AS groupName, + si_op.value AS opName, + (COMMUNICATION_OP.endNs - COMMUNICATION_OP.startNs) / 1000.0 AS communication_time, + step_time.id AS step + FROM COMMUNICATION_OP + CROSS JOIN RANK_DEVICE_MAP + JOIN STRING_IDS si_group ON COMMUNICATION_OP.groupName = si_group.id + JOIN STRING_IDS si_op ON COMMUNICATION_OP.opName = si_op.id + JOIN CANN_API ON CANN_API.connectionId = COMMUNICATION_OP.connectionId + LEFT JOIN STEP_TIME step_time + ON CANN_API.startNs >= step_time.startNs AND CANN_API.startNs <= step_time.endNs + {} + """ + + def __init__(self, db_path, recipe_name, step_range): + super().__init__(db_path, recipe_name, step_range) + filter_statement = "WHERE CANN_API.startNs >= ? and CANN_API.startNs <= ?" if step_range else "" + self._query = self.QUERY.format(filter_statement) + + +class MemoryAndDispatchTimeExport(BaseStatsExport): + QUERY = """ + WITH + computing AS ( + SELECT + TASK.startNs, + TASK.endNs, + CANN_API.startNs as apiStartNs, + 0 AS type + FROM COMPUTE_TASK_INFO + JOIN TASK ON COMPUTE_TASK_INFO.globalTaskId = TASK.globalTaskId AND TASK.startNs != TASK.endNs + JOIN CANN_API ON CANN_API.connectionId = TASK.connectionId + ), + communication AS ( + SELECT + COMMUNICATION_OP.startNs, + COMMUNICATION_OP.endNs, + CANN_API.startNs as apiStartNs, + 1 AS type + FROM COMMUNICATION_OP + JOIN CANN_API ON CANN_API.connectionId = COMMUNICATION_OP.connectionId + ), + memory AS ( + SELECT + TASK.startNs, + TASK.endNs, + TASK.startNs as apiStartNs, + 4 AS type + FROM TASK + WHERE taskType = (SELECT id FROM STRING_IDS WHERE value='MEMCPY_ASYNC') + ), + overlap AS ( + SELECT startNs, endNs, apiStartNs, type FROM computing + UNION ALL SELECT startNs, endNs, apiStartNs, type FROM communication + UNION ALL SELECT startNs, endNs, apiStartNs, type FROM memory + ) + SELECT + overlap.startNs AS start, + overlap.endNs AS end, + overlap.type, + step_time.id AS step + FROM overlap + LEFT JOIN STEP_TIME step_time + ON overlap.apiStartNs >= step_time.startNs + AND overlap.apiStartNs <= step_time.endNs + {} + ORDER BY overlap.startNs, overlap.endNs + """ + + def __init__(self, db_path, recipe_name, step_range): + super().__init__(db_path, recipe_name, step_range) + filter_statement = "WHERE overlap.apiStartNs >= ? and overlap.apiStartNs <= ?" if step_range else "" + self._query = self.QUERY.format(filter_statement) + self.mode = None \ No newline at end of file diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_cluster_time_compare_summary.py b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_cluster_time_compare_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..d6379e4bdedf038277d3afdf33fae03a8ce01e1d --- /dev/null +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_cluster_time_compare_summary.py @@ -0,0 +1,155 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +from unittest import mock +import pandas as pd + +from msprof_analyze.cluster_analyse.recipes.cluster_time_compare_summary.cluster_time_compare_summary import \ + ClusterTimeCompareSummary +from msprof_analyze.prof_common.constant import Constant + +NAMESPACE = "msprof_analyze.prof_common" + + +class TestClusterTimeCompareSummary(unittest.TestCase): + PARAMS = { + Constant.COLLECTION_PATH: "/data", + Constant.DATA_MAP: {}, + Constant.DATA_TYPE: Constant.DB, + Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: "./test_cluster_time_compare_summary", + Constant.RECIPE_NAME: "ClusterTimeCompareSummary", + Constant.RECIPE_CLASS: ClusterTimeCompareSummary, + Constant.PARALLEL_MODE: Constant.CONCURRENT_MODE, + Constant.EXPORT_TYPE: Constant.DB, + ClusterTimeCompareSummary.RANK_LIST: Constant.ALL, + } + + def test_check_params_is_valid_should_return_false_when_bp_param_does_not_exist(self): + params = {} + params.update(self.PARAMS) + self.assertFalse(ClusterTimeCompareSummary(params).check_params_is_valid()) + + def test_check_params_is_valid_should_return_false_when_export_type_is_notebook(self): + params = {Constant.EXTRA_ARGS: ["--bp", "/data2"]} + params.update(self.PARAMS) + params[Constant.EXPORT_TYPE] = Constant.NOTEBOOK + self.assertFalse(ClusterTimeCompareSummary(params).check_params_is_valid()) + + def test_check_params_is_valid_should_return_false_when_base_path_is_invalid(self): + params = {Constant.EXTRA_ARGS: ["--bp", "/data2"]} + params.update(self.PARAMS) + with mock.patch(NAMESPACE + ".path_manager.PathManager.check_input_file_path", side_effect=RuntimeError): + self.assertFalse(ClusterTimeCompareSummary(params).check_params_is_valid()) + + def test_check_params_is_valid_should_return_false_when_table_cluster_time_summary_does_not_exist(self): + params = {} + params.update(self.PARAMS) + with mock.patch(NAMESPACE + ".db_manager.DBManager.check_tables_in_db", return_value=False): + self.assertFalse(ClusterTimeCompareSummary(params).check_params_is_valid()) + + def test_check_params_is_valid_should_return_false_when_base_table_cluster_time_summary_does_not_exist(self): + params = {Constant.EXTRA_ARGS: ["--bp", "/data2"]} + params.update(self.PARAMS) + with mock.patch(NAMESPACE + ".path_manager.PathManager.check_input_file_path"), \ + mock.patch(NAMESPACE + ".db_manager.DBManager.check_tables_in_db", side_effect=[True, False]): + self.assertFalse(ClusterTimeCompareSummary(params).check_params_is_valid()) + + def test_run_when_all_parameters_are_normal(self): + params = {Constant.EXTRA_ARGS: ["--bp", "/data2"]} + params.update(self.PARAMS) + params[Constant.EXPORT_TYPE] = Constant.DB + data_base = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5] + data = [1.6, 2.6, 3.6, 4.6, 5.6, 6.6, 7.6, 8.6, 9.6, 10.6, 11.6, 12.6, 13.6, 14.6, 15.6, 16.6] + data1 = [1.6, 2.6, 3.6, 4.6, 5.6, 6.6, 7.6, 8.6, 9.6, 10.6, 11.6, 12.6, 13.6, 14.6] + data_diff = [1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1] + base_cluster_time_summary_df_dict = { + Constant.TABLE_CLUSTER_TIME_SUMMARY: pd.DataFrame( + { + "rank": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6], + "step": [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], + "stepTime": data_base, + "computation": data_base, + "communicationNotOverlapComputation": data_base, + "communicationOverlapComputation": data_base, + "communication": data_base, + "free": data_base, + "communicationWaitStageTime": data_base, + "communicationTransmitStageTime": data_base, + "memory": data_base, + "memoryNotOverlapComputationCommunication": data_base, + } + ) + } + cluster_time_summary_df_dict = { + Constant.TABLE_CLUSTER_TIME_SUMMARY: pd.DataFrame( + { + "rank": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7], + "step": [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], + "stepTime": data, + "computation": data, + "communicationNotOverlapComputation": data, + "communicationOverlapComputation": data, + "communication": data, + "free": data, + "communicationWaitStageTime": data, + "communicationTransmitStageTime": data, + "memory": data, + "memoryNotOverlapComputationCommunication": data, + } + ) + } + expected_result = pd.DataFrame({ + "rank": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6], + "step": [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], + "stepTime": data1, + "stepTimeBase": data_base, + "stepTimeDiff": data_diff, + "computation": data1, + "computationBase": data_base, + "computationDiff": data_diff, + "communicationNotOverlapComputation": data1, + "communicationNotOverlapComputationBase": data_base, + "communicationNotOverlapComputationDiff": data_diff, + "communicationOverlapComputation": data1, + "communicationOverlapComputationBase": data_base, + "communicationOverlapComputationDiff": data_diff, + "communication": data1, + "communicationBase": data_base, + "communicationDiff": data_diff, + "free": data1, + "freeBase": data_base, + "freeDiff": data_diff, + "communicationWaitStageTime": data1, + "communicationWaitStageTimeBase": data_base, + "communicationWaitStageTimeDiff": data_diff, + "communicationTransmitStageTime": data1, + "communicationTransmitStageTimeBase": data_base, + "communicationTransmitStageTimeDiff": data_diff, + "memory": data1, + "memoryBase": data_base, + "memoryDiff": data_diff, + "memoryNotOverlapComputationCommunication": data1, + "memoryNotOverlapComputationCommunicationBase": data_base, + "memoryNotOverlapComputationCommunicationDiff": data_diff, + }) + with mock.patch(NAMESPACE + ".path_manager.PathManager.check_input_file_path"), \ + mock.patch(NAMESPACE + ".db_manager.DBManager.check_tables_in_db", side_effect=[True, True]), \ + mock.patch(NAMESPACE + ".database_service.DatabaseService.query_data", + side_effect=[cluster_time_summary_df_dict, base_cluster_time_summary_df_dict]): + cluster_time_compare_summary = ClusterTimeCompareSummary(params) + cluster_time_compare_summary.run() + self.assertTrue(cluster_time_compare_summary.compare_result.round(2).equals(expected_result.round(2))) \ No newline at end of file