diff --git a/profiler/msprof_analyze/cli/cluster_cli.py b/profiler/msprof_analyze/cli/cluster_cli.py index 0cdb2bd2b10b2ede411d10221e36a51e3f015e12..b0c9fb6ec4661bfc6432291f76eeefc07db7767d 100644 --- a/profiler/msprof_analyze/cli/cluster_cli.py +++ b/profiler/msprof_analyze/cli/cluster_cli.py @@ -32,7 +32,7 @@ context_settings['ignore_unknown_options'] = True @click.option('--data_simplification', is_flag=True, help='data simplification switch for db data') @click.option('--force', is_flag=True, help="Indicates whether to skip file size verification and owner verification") @click.option("--parallel_mode", type=str, help="context mode", default="concurrent") -@click.option("--export_type", help="recipe export type", type=click.Choice(["db", "notebook"]), default="db") +@click.option("--export_type", help="recipe export type", type=click.Choice(["db", "notebook", "excel"]), default="db") @click.option("--rank_list", type=str, help="Rank id list", default='all') @click.argument('args', nargs=-1) def cluster_cli(**kwargs) -> None: diff --git a/profiler/msprof_analyze/cluster_analyse/README.md b/profiler/msprof_analyze/cluster_analyse/README.md index e488ab85f76c40da44d188ac421abe7f3b533da1..40515ca88ae21fc03f2cac4b79ae06a2767f68d3 100644 --- a/profiler/msprof_analyze/cluster_analyse/README.md +++ b/profiler/msprof_analyze/cluster_analyse/README.md @@ -54,45 +54,47 @@ experimental_config = torch_npu.profiler._ExperimentalConfig( 参数说明: - | 参数名 | 说明 | 是否必选 | - | --------------------- | ------------------------------------------------------------ | -------- | - | --profiling_path或-d | 性能数据汇集目录。未配置-o参数时,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 是 | - | --output_path或-o | 自定义输出路径,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 否 | - | --mode或-m | 数据解析模式,取值详见“**--mode参数说明**”表。 | 否 | - | --data_simplification | 数据精简模式。对于数据量过大的性能数据db文件,可以通过配置该参数将数据精简,并提高工具分析效率。配置该参数表示开启数据精简,默认未配置表示关闭。 | 否 | - | --force | 强制执行cluster。配置后可强制跳过如下情况:
指定的目录、文件的用户属主不属于当前用户,忽略属主判断直接执行。
csv文件大于5G、json文件大于10G、db文件大于8G,忽略文件过大判断直接执行。
配置该参数表示开启强制执行,默认未配置表示关闭。 | 否 | - | --parallel_mode | 设置收集多卡、多节点db数据时的并发方式。取值为concurrent(使用concurrent.feature进程池实现并发)。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | - | --export_type | 设置导出的数据形式。取值为db(.db格式文件)和notebook(Jupyter Notebook文件),默认值为db。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | - | --rank_list | 对特定Rank上的数据进行统计,默认值为all(表示对所有Rank进行统计),须根据实际卡的Rank ID配置。应配置为大于等于0的整数,若所配置的值大于实际训练所运行的卡的Rank ID,则仅解析合法的RankID的数据,比如当前环境Rank ID为0到7,实际训练运行0到3卡,此时若配置Rank ID为0, 3, 4或不存在的10等其他值,则仅解析0和3。配置示例:--rank_list 0, 1, 2。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | - | --top_num | 设置TopN耗时的通信算子的数量,默认值为15,配置示例:--top_num 20。
**只有-m配置hccl_sum时可配置此参数。** | 否 | - | --exclude_op_name | 控制compute_op_name结果是否包含op_name,示例:--exclude_op_name,后面不需要跟参数。
**只有-m配置compute_op_sum时可配置此参数。** | 否 | - | --bp | 要对比的标杆集群数据,示例:--bp {bp_cluster_profiling_path},表示profiling_path和bp_cluster_profiling_path的数据进行对比。
**只有-m配置cluster_time_compare_summary时可配置此参数。** | 否 | - + | 参数名 | 说明 | 是否必选 | + | -------------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -------- | + | --profiling_path或-d | 性能数据汇集目录。未配置-o参数时,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 是 | + | --output_path或-o | 自定义输出路径,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 否 | + | --mode或-m | 数据解析模式,取值详见“**--mode参数说明**”表。 | 否 | + | --data_simplification | 数据精简模式。对于数据量过大的性能数据db文件,可以通过配置该参数将数据精简,并提高工具分析效率。配置该参数表示开启数据精简,默认未配置表示关闭。 | 否 | + | --force | 强制执行cluster。配置后可强制跳过如下情况:
指定的目录、文件的用户属主不属于当前用户,忽略属主判断直接执行。
csv文件大于5G、json文件大于10G、db文件大于8G,忽略文件过大判断直接执行。
配置该参数表示开启强制执行,默认未配置表示关闭。 | 否 | + | --parallel_mode | 设置收集多卡、多节点db数据时的并发方式。取值为concurrent(使用concurrent.feature进程池实现并发)。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | + | --export_type | 设置导出的数据形式。取值为db(.db格式文件)、notebook(Jupyter Notebook文件)和excel(excel格式文件),默认值为db。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | + | --rank_list | 对特定Rank上的数据进行统计,默认值为all(表示对所有Rank进行统计),须根据实际卡的Rank ID配置。应配置为大于等于0的整数,若所配置的值大于实际训练所运行的卡的Rank ID,则仅解析合法的RankID的数据,比如当前环境Rank ID为0到7,实际训练运行0到3卡,此时若配置Rank ID为0, 3, 4或不存在的10等其他值,则仅解析0和3。配置示例:--rank_list 0, 1, 2。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 | + | --top_num | 设置TopN耗时的通信算子的数量,默认值为15,配置示例:--top_num 20。
**只有-m配置hccl_sum时可配置此参数。** | 否 | + | --exclude_op_name | 控制compute_op_name结果是否包含op_name,示例:--exclude_op_name,后面不需要跟参数。
**只有-m配置compute_op_sum时可配置此参数。** | 否 | + | --bp | 要对比的标杆集群数据,示例:--bp {bp_cluster_profiling_path},表示profiling_path和bp_cluster_profiling_path的数据进行对比。
**只有-m配置cluster_time_compare_summary时可配置此参数。** | 否 | + + --mode参数说明: --mode参数设置不同的数据解析模式,可调用不同的分析能力,交付件详细内容请参见[recipe结果和cluster_analysis.db交付件表结构说明](#recipe结果和cluster_analysisdb交付件表结构说明)。 - | 参数名 | 说明 | 是否必选 | - |------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----| - | communication_matrix | 解析通信矩阵数据。 | 否 | - | communication_time | 解析通信耗时数据。 | 否 | - | all | 同时解析通信矩阵communication_matrix和通信耗时数据communication_time,--mode参数默认值为all。 | 否 | - | cann_api_sum | 集群API性能数据汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/CannApiSum目录下输出交付件stats.ipynb。 | 否 | - | compute_op_sum | 集群场景性能数据的device运行算子信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/ComputeOpSum目录下输出交付件stats.ipynb;可根据实际情况决定是否是否打开--exclude_op_name。 | 否 | - | hccl_sum | 集合通信算子耗时分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/HcclSum目录下输出交付件stats.ipynb。 | 否 | - | mstx_sum | 集群场景mstx打点信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/MstxSum目录下输出交付件stats.ipynb。 | 否 | - | slow_link | 集群慢链路异常分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/SlowLink目录下输出交付件stats.ipynb。 | 否 | - | cluster_time_summary | 集群场景性能数据分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db和analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db里面有ClusterTimeSummary,不支持导出notebook。 | 否 | - | cluster_time_compare_summary | 集群场景性能数据对比分析,使用前集群数据必须先分析cluster_time_summary,需要配合--bp参数使用。输入性能数据需要基于cluster_analysis_output下的cluster_analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db文件中有对比结果的表ClusterTimeCompareSummary,不支持导出notebook。 | 否 | - | slow_rank_pp_stage | 集群场景性能数据pp stage通信对比分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输入性能数据中MetaData表如果没有包含训练任务的并行策略,则需要通过--tp --pp --dp手动传入,数据类型为正整数。--export_type为db时,输出交付件cluster_analysis.db,db文件中有分析结果PPAnalysisResult和P2PAnalysisResult,不支持导出notebook。 | 否 | - | freq_analysis | 集群场景aicore frequency信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。打印输出是否aicore存在空闲(频率为800MHz)、异常(频率不为1800MHz或800MHz)的现象。如果有,则在输出交付件cluster_analysis.db增加对应的卡和频率信息。 | 否 | - | ep_load_balance | 集群场景moe负载信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db增加EPTokensSummary, TopEPTokensInfo分析表格。 | 否 | - | mstx2comm | 基于ascend_pytorch_profiler_{rank_id}.db文件,将通信内置打点数据转换成通信算子。 | 否 | - | slow_rank | 集群场景通信算子快慢卡汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db中展示各个rank按照当前的快慢卡统计算法得出的快慢卡影响次数。 | | - | p2p_pairing | 集群场景P2P算子生成全局关联索引,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出的关联索引会作为一个新的字段`opConnectionId`附在原性能数据ascend_pytorch_profiler_{rank_id}.db文件的`COMMUNICATION_OP`的表中。 | 否 | - | filter_db | 基于ascend_pytorch_profiler_{rank_id}.db文件,提取通信类大算子数据,计算类关键函数和框架关键函数,节约90%+ 内存,促进快速全局分析。 | 否 | - | pp_chart | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,还原pp流水图 | 否 | - | 自定义分析参数 | 与cann_api_sum、compute_op_sum、hccl_sum等参数功能类似,用户可自定义一套性能数据的分析规则,需要详细了解性能分析的开发人员,具体开发指导请参见“[自定义分析规则开发指导](#自定义分析规则开发指导)”。 | 否 | + | 参数名 | 说明 | 是否必选 | export_type支持类型 | + |------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----|-----------------| + | communication_matrix | 解析通信矩阵数据。 | 否 | / | + | communication_time | 解析通信耗时数据。 | 否 | / | + | all | 同时解析通信矩阵communication_matrix和通信耗时数据communication_time,--mode参数默认值为all。 | 否 | / | + | cann_api_sum | 集群API性能数据汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/CannApiSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook | + | compute_op_sum | 集群场景性能数据的device运行算子信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/ComputeOpSum目录下输出交付件stats.ipynb;可根据实际情况决定是否是否打开--exclude_op_name。 | 否 | db, nootebook | + | hccl_sum | 集合通信算子耗时分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/HcclSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook | + | mstx_sum | 集群场景mstx打点信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/MstxSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook | + | slow_link | 集群慢链路异常分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/SlowLink目录下输出交付件stats.ipynb。 | 否 | db, nootebook | + | cluster_time_summary | 集群场景性能数据分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db和analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db里面有ClusterTimeSummary。 | 否 | db | + | cluster_time_compare_summary | 集群场景性能数据对比分析,使用前集群数据必须先分析cluster_time_summary,需要配合--bp参数使用。输入性能数据需要基于cluster_analysis_output下的cluster_analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db文件中有对比结果的表ClusterTimeCompareSummary。 | 否 | db | + | slow_rank_pp_stage | 集群场景性能数据pp stage通信对比分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输入性能数据中MetaData表如果没有包含训练任务的并行策略,则需要通过--tp --pp --dp手动传入,数据类型为正整数。--export_type为db时,输出交付件cluster_analysis.db,db文件中有分析结果PPAnalysisResult和P2PAnalysisResult。 | 否 | db | + | freq_analysis | 集群场景aicore frequency信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。打印输出是否aicore存在空闲(频率为800MHz)、异常(频率不为1800MHz或800MHz)的现象。如果有,则在输出交付件cluster_analysis.db增加对应的卡和频率信息。 | 否 | db | + | ep_load_balance | 集群场景moe负载信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db增加EPTokensSummary, TopEPTokensInfo分析表格。 | 否 | db | + | mstx2commop | 基于ascend_pytorch_profiler_{rank_id}.db文件,将通信内置打点数据转换成通信算子。 | 否 | db | + | slow_rank | 集群场景通信算子快慢卡汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db中展示各个rank按照当前的快慢卡统计算法得出的快慢卡影响次数。 | 否 | db | + | p2p_pairing | 集群场景P2P算子生成全局关联索引,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出的关联索引会作为一个新的字段`opConnectionId`附在原性能数据ascend_pytorch_profiler_{rank_id}.db文件的`COMMUNICATION_OP`的表中。 | 否 | db | + | filter_db | 基于ascend_pytorch_profiler_{rank_id}.db文件,提取通信类大算子数据,计算类关键函数和框架关键函数,节约90%+ 内存,促进快速全局分析。 | 否 | db | + | pp_chart | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,还原pp流水图 | 否 | db | + | module_statistic | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,拆解模型结构与算子关联,具体指导请参见“[模型结构拆解指南](../docs/features/module_statistic.md)” | 否 | db, excel | + | 自定义分析参数 | 与cann_api_sum、compute_op_sum、hccl_sum等参数功能类似,用户可自定义一套性能数据的分析规则,需要详细了解性能分析的开发人员,具体开发指导请参见“[自定义分析规则开发指导](#自定义分析规则开发指导)”。 | 否 | / | --parallel_mode参数示例如下: diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py index 6464bb732ddf57b2790d99ac7148ce3ecaf327ce..43e716efa116b0613fcb8a14fe6555afcdbb5c46 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py @@ -140,7 +140,8 @@ def cluster_analysis_main(): parser.add_argument('--force', action='store_true', help="Indicates whether to skip file size verification and owner verification") parser.add_argument("--parallel_mode", type=str, help="context mode", default="concurrent") - parser.add_argument("--export_type", type=str, help="recipe export type", choices=["db", "notebook"], default="db") + parser.add_argument("--export_type", type=str, help="recipe export type", choices=["db", "notebook", "excel"], + default="db") parser.add_argument("--rank_list", type=str, help="Rank id list", default='all') args, extra_args = parser.parse_known_args() diff --git a/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py b/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..3ca5308b5963ab9e408adf433711028c06a5fc4e --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py @@ -0,0 +1,175 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Dict, Optional + +import pandas as pd + +from msprof_analyze.prof_common.logger import get_logger + +logger = get_logger() + + +class ExcelUtils: + DEFAULT_FORMAT = { + 'valign': 'vcenter', + 'border': 1, + 'font_name': 'Times New Roman' + } + DEFAULT_HEADER_FORMAT = { + 'valign': 'vcenter', + 'bold': True, + 'border': 1, + 'bg_color': '#AFEEEE', + 'font_name': 'Times New Roman' + } + + def __init__(self): + self.workbook = None + self.writer = None + self.worksheet = None + self.df = None + self._formats_cache = {} + + def clear(self) -> None: + """显式清理资源,允许实例重复使用""" + if self.writer: + self.writer.close() + self.workbook = None + self.writer = None + self.worksheet = None + self.df = None + self._formats_cache = {} + + def create_excel_writer(self, output_file: str, + df: pd.DataFrame, + column_format: Optional[Dict] = None, + header_format: Optional[Dict] = None, + sheet_name: str = 'Sheet1'): + """初始化Excel写入器并写入原始数据""" + self.writer = pd.ExcelWriter(output_file, engine='xlsxwriter') + self.workbook = self.writer.book + self.worksheet = self.workbook.add_worksheet(sheet_name) + self.df = df + + # 写入标题行 + header_fmt = self._get_format(header_format if header_format else self.DEFAULT_HEADER_FORMAT) + for col_idx, col_name in enumerate(df.columns): + self.worksheet.write(0, col_idx, col_name, header_fmt) + + # 写入数据行 + default_fmt = self._get_format(column_format if column_format else self.DEFAULT_FORMAT) + for row_idx, row in df.iterrows(): + for col_idx, col_name in enumerate(df.columns): + self.worksheet.write(row_idx + 1, col_idx, row[col_name], default_fmt) + + def save_and_close(self): + """保存并关闭 Excel 文件""" + if self.writer: + self.writer.close() + self.writer = None + self.workbook = None + self.worksheet = None + + def set_column_width(self, columns_config: Dict[str, int]): + """设置列宽""" + if not self.worksheet: + raise Exception("Worksheet has not been initialized!") + + for col, width in columns_config.items(): + col_idx = list(columns_config.keys()).index(col) + self.worksheet.set_column(col_idx, col_idx, width) + + def set_row_height(self, row: int, height: int): + """设置指定行的行高""" + if not self.worksheet: + raise Exception("Worksheet not initialized") + self.worksheet.set_row(row, height) + + def freeze_panes(self, row: int = 1, col: int = 0): + """冻结窗格""" + if not self.worksheet: + raise Exception("Worksheet has not been initialized!") + self.worksheet.freeze_panes(row, col) + + def merge_duplicate_cells( + self, + columns_to_merge: List[str], + merge_format: Optional[Dict] = None, + header_format: Optional[Dict] = None + ): + """ + 合并连续相同值的单元格 + + 参数: + df: 输入的 DataFrame + columns_to_merge: 需要合并的列名列表 + merge_format: 合并单元格的格式字典 + header_format: 标题行的格式字典 + """ + if not self.workbook or not self.worksheet: + raise Exception("Worksheet has not been initialized!") + + # 设置格式 + merge_fmt = self._get_format(merge_format if merge_format else self.DEFAULT_FORMAT) + header_fmt = self._get_format(header_format if header_format else self.DEFAULT_HEADER_FORMAT) + + # 应用标题行格式 + for col_num, value in enumerate(self.df.columns.values): + self.worksheet.write(0, col_num, value, header_fmt) + + # 遍历需要合并的列 + for col in columns_to_merge: + if col not in self.df.columns: + logger.warning(f"Invalid column: {col}, not in dataframe!") + continue + + col_idx = self.df.columns.get_loc(col) + current_value = None + start_row = 1 # 第一行是标题,从第二行开始 + merge_count = 0 + + for i in range(len(self.df)): + excel_row = i + 1 + + if self.df[col].iloc[i] == current_value: + continue + else: + if current_value is not None and (excel_row - 1) > start_row: + self.worksheet.merge_range( + start_row, col_idx, excel_row - 1, col_idx, + current_value, + merge_fmt + ) + merge_count += 1 + + current_value = self.df[col].iloc[i] + start_row = excel_row + + # 处理最后一组连续相同的值 + if current_value is not None and (len(self.df)) > start_row: + self.worksheet.merge_range( + start_row, col_idx, len(self.df), col_idx, + current_value, + merge_fmt + ) + merge_count += 1 + + def _get_format(self, format_dict: Dict): + """获取或创建格式对象(带缓存)""" + format_key = frozenset(format_dict.items()) + if format_key not in self._formats_cache: + self._formats_cache[format_key] = self.workbook.add_format(format_dict) + return self._formats_cache[format_key] \ No newline at end of file diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py new file mode 100644 index 0000000000000000000000000000000000000000..91a354f81ae0838e8e96688fa47e2a8e45057060 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py @@ -0,0 +1,347 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from collections import defaultdict, deque + +import pandas as pd + +from msprof_analyze.cluster_analyse.common_func.excel_utils import ExcelUtils +from msprof_analyze.prof_common.db_manager import DBManager +from msprof_analyze.prof_exports.module_statistic_export import FrameworkOpToKernelExport, ModuleMstxRangeExport +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger + +logger = get_logger() + + +class NodeType: + MODULE_EVENT_NODE = 0 + CPU_OP_EVENT = 1 + KERNEL_EVENT = 2 + + +class TreeNode: + def __init__(self, start, end, node_type, name): + self.start = start + self.end = end + self.node_type = node_type + self.name = name + self.children = [] + + def add_child(self, node): + self.children.append(node) + + +class ModuleStatistic(BaseRecipeAnalysis): + TABLE_MODULE_STATISTIC = "ModuleStatistic" + KERNEL_RELATED_TABLE_LIST = [Constant.TABLE_COMPUTE_TASK_INFO, Constant.TABLE_COMMUNICATION_OP, + Constant.TABLE_COMMUNICATION_SCHEDULE_TASK_INFO] + MODULE_DOMAIN_NAME = 'Module' + MAX_TRAVERSE_DEPTH = 20 + + EXCEL_COLUMN_WIDTH_CONFIG = {"Parent Module": 40, + "Module": 40, + "Op Name": 40, + "Kernel List": 60, + "Total Kernel Duration(ns)": 15, + "Avg Kernel Duration(ns)": 15, + "Op Count": 15} + EXCEL_COLUMN_ALIGN_CONFIG = {"Total Kernel Duration(ns)": 'center', + "Avg Kernel Duration(ns)": 'center', + "Op Count": 'center'} + + def __init__(self, params): + super().__init__(params) + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + def run(self, context): + if self._export_type != Constant.DB and self._export_type != Constant.EXCEL: + logger.error(f"Invalid export type: {self._export_type} for module analysis, " + f"required to be {Constant.DB} or {Constant.EXCEL}") + mapper_res = self.mapper_func(context) + if self._export_type == Constant.DB: + total_df = self.reducer_func(mapper_res) + self.save_db(total_df) + elif self._export_type == Constant.EXCEL: + self.save_csv(mapper_res) + + def reducer_func(self, mapper_res): + valid_dfs = [stat_df.assign(rankID=rank_id) + for rank_id, stat_df in mapper_res + if not stat_df.empty] + return pd.concat(valid_dfs, ignore_index=True) if valid_dfs else None + + def save_db(self, df): + if df is None or df.empty: + logger.warning(f"No module analysis result, skipping dump data") + return + self.dump_data(df, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER, + self.TABLE_MODULE_STATISTIC, index=False) + + def save_csv(self, mapper_res): + columns_to_merge = ['Parent Module', 'Module'] + column_width_config = {"Parent Module": 40, + "Module": 40, + "Op Name": 40, + "Kernel List": 50, + "Total Kernel Duration(ns)": 10, + "Avg Kernel Duration(ns)": 10, + "Op Count": 10} + excel_utils = ExcelUtils() + for rank_id, stat_df in mapper_res: + if stat_df.empty: + logger.warning(f"No module analysis result for rank {rank_id}, skipping dump data") + continue + file_name = f"module_statistic_{rank_id}.xlsx" + file_path = os.path.join(self.output_path, file_name) + try: + excel_utils.create_excel_writer(file_path, stat_df) + excel_utils.merge_duplicate_cells(columns_to_merge) + excel_utils.set_column_width(column_width_config) + excel_utils.set_row_height(0, 27) # 标题行行高27 + excel_utils.save_and_close() + excel_utils.clear() + except Exception as e: + logger.error(f"Save excel failed, err: {e}") + + def _mapper_func(self, data_map, analysis_class): + """ + + kernel_df = pd.DataFrame() + """ + # Query Data + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + rank_id = data_map.get(Constant.RANK_ID) + module_export = ModuleMstxRangeExport(profiler_db_path, self._recipe_name, domain_name=self.MODULE_DOMAIN_NAME) + module_df = module_export.read_export_db() # module_df columns:["startNs", "endNs", "name"] + if module_df is None or module_df.empty: + logger.error(f"Can not export mstx range event with domain: {self.MODULE_DOMAIN_NAME}, from rank {rank_id}") + return rank_id, pd.DataFrame() + # kernel_df columns:["kernel_name", "kernel_ts", "kernel_end", "op_name", "op_ts", "op_end"] + kernel_df = self._query_framework_op_to_kernel(profiler_db_path) + if kernel_df is None or kernel_df.empty: + logger.error(f"Can not export framework op to kernel mapper from rank {rank_id}") + return rank_id, pd.DataFrame() + # Convert time related columns' type to be int + mstx_time_columns = ['startNs', 'endNs'] + module_df[mstx_time_columns] = module_df[mstx_time_columns].astype(int) + kernel_time_columns = ['kernel_ts', 'kernel_end', 'op_ts', 'op_end'] + kernel_df[kernel_time_columns] = kernel_df[kernel_time_columns].astype(int) + + # Build Tree + root = self._build_node_tree(module_df, kernel_df) + if not root: + logger.error(f"Empty event tree for rank {rank_id}") + return rank_id, pd.DataFrame() + # Convert to Dataframe + verbose_df = self._flatten_tree_to_dataframe(root) + if verbose_df.empty: + logger.error(f"Failed to extract event tree data for rank {rank_id}") + return rank_id, pd.DataFrame() + # 区分module与算子序列,并计算device_time平均值 + stat_df = self._aggregate_module_operator_stats(verbose_df.copy()) + return rank_id, stat_df + + def _query_framework_op_to_kernel(self, profiler_db_path): + valid_dfs = [] + for table_name in self.KERNEL_RELATED_TABLE_LIST: + if not DBManager.check_tables_in_db(profiler_db_path, table_name): + continue + export = FrameworkOpToKernelExport(profiler_db_path, self._recipe_name, table_name) + df = export.read_export_db() + if df is not None and not df.empty: + valid_dfs.append(df) + if not valid_dfs: + return None + try: + return pd.concat(valid_dfs, ignore_index=True) + except Exception as e: + logger.error(f"Failed to concatenate framework op to kernel dataframes: {str(e)}") + return None + + def _build_node_tree(self, module_df, kernel_df): + nodes = [] + + # 1. 创建mstx节点 + for _, row in module_df.iterrows(): + nodes.append(TreeNode( + row['startNs'], + row['endNs'], + NodeType.MODULE_EVENT_NODE, + row['name'] + )) + + # 2. 按op_name分组处理kernel数据 + op_groups = defaultdict(list) + for _, row in kernel_df.iterrows(): + op_groups[(row['op_name'], row['op_ts'], row['op_end'])].append(row) + + # 3. 创建op节点和对应的kernel节点 + for (op_name, op_ts, op_end), kernels in op_groups.items(): + # 创建op节点 + op_node = TreeNode( + op_ts, + op_end, + NodeType.CPU_OP_EVENT, + op_name + ) + + # 为每个op添加对应的kernel节点 + for kernel in kernels: + kernel_node = TreeNode( + kernel['kernel_ts'], + kernel['kernel_end'], + NodeType.KERNEL_EVENT, + kernel['kernel_name'] + ) + op_node.add_child(kernel_node) + + nodes.append(op_node) + + if not nodes: + logger.error(f"Empty node (module_event/cpu_op/kernel), skipping tree build") + return None + + # 4. 按开始时间升序、结束时间降序排序 + nodes.sort(key=lambda x: (x.start, -x.end)) + + # 5. 构建树结构 + root = self._create_root_node(module_df, kernel_df) + stack = [root] + + for node in nodes: + # 找到最近的能包含当前节点的父节点 + while stack[-1].start > node.start or stack[-1].end < node.end: + stack.pop() + + # 添加到父节点的children中 + stack[-1].add_child(node) + stack.append(node) + + return root + + def _create_root_node(self, module_df, kernel_df): + global_start = min(module_df['startNs'].min(), kernel_df['kernel_ts'].min(), kernel_df['op_ts'].min()) + global_end = max(module_df['endNs'].max(), kernel_df['kernel_end'].max(), kernel_df['op_end'].max()) + root = TreeNode(global_start, global_end, NodeType.MODULE_EVENT_NODE, "") + return root + + def _flatten_tree_to_dataframe(self, root_node): + results = [] + + def traverse(node, module_deque, depth=0): + if depth > self.MAX_TRAVERSE_DEPTH: + logger.warning(f"Max traversal depth {self.MAX_TRAVERSE_DEPTH} reached, traversal stopped. " + f"Some information may be lost") + return + if node.node_type != NodeType.MODULE_EVENT_NODE: + return + for op_child in node.children: + if op_child.node_type == NodeType.MODULE_EVENT_NODE: + module_deque.append(node.name) + traverse(op_child, module_deque, depth + 1) + module_deque.pop() + if op_child.node_type == NodeType.CPU_OP_EVENT: + module = node.name + module_parent = "/".join(module_deque) + # 跳过没有module归属的op + if not module and not module_parent: + continue + # 收集该op下的所有kernel信息 + kernel_names = [] + total_device_time = 0.0 + for kernel_child in op_child.children: + if kernel_child.node_type == NodeType.KERNEL_EVENT: + kernel_names.append(kernel_child.name) + duration = kernel_child.end - kernel_child.start + total_device_time += duration + results.append({ + 'module_parent': module_parent, + 'module': module, + 'module_start': node.start, + 'module_end': node.end, + 'op_name': op_child.name, + 'op_start': op_child.start, + 'op_end': op_child.end, + 'kernel_list': ', '.join(kernel_names), + 'device_time': total_device_time + }) + + traverse(root_node, deque(), 0) + if not results: + return pd.DataFrame() + # 转换为DataFrame并排序 + df = pd.DataFrame(results) + df = df.sort_values(by=['module_start', 'op_start'], ascending=[True, True]) + return df + + def _aggregate_module_operator_stats(self, df): + if df is None or df.empty: + logger.warning("Empty dataframe received for aggregation") + return pd.DataFrame() + + # 为每个算子添加在module下的顺序位置 + distinct_module_columns = ['module_parent', 'module', 'module_start', 'module_end'] + df['op_order'] = df.groupby(distinct_module_columns).cumcount() + # 创建seq_key保证唯一性,并分配ID + op_seq = df.groupby(distinct_module_columns)['op_name'].transform(lambda x: '/'.join(x)) + df['seq_key'] = df['module_parent'] + "|" + df['module'] + "|" + op_seq + df['seq_id'] = pd.factorize(op_seq)[0] + df.drop(columns=['seq_key'], inplace=True) + + stat_df = ( + df.groupby(['module_parent', 'module', 'op_name', 'op_order', 'kernel_list', 'seq_id']) + .agg( + module_start=('module_start', 'first'), # 取第一个 module_start + module_end=('module_end', 'first'), # 取第一个 module_end + total_kernel_duration=('device_time', 'sum'), # 计算 device_time 的总时间 + avg_kernel_duration=('device_time', 'mean'), # 计算 device_time 的平均时间 + op_count=('device_time', 'count') # 计算每组的行数(计数) + ).reset_index() + ) + stat_df = (stat_df.sort_values(by=['module_start', 'op_order']). + drop(columns=['module_start', 'module_end', 'seq_id', 'op_order']).reset_index(drop=True)) + + return self._format_stat_df_columns(stat_df) + + def _format_stat_df_columns(self, stat_df): + try: + if self._export_type == Constant.DB: + stat_df = stat_df.rename(columns={ + 'module_parent': 'parentModule', + 'op_name': 'opName', + 'kernel_list': 'kernelList', + 'op_count': 'opCount', + 'total_kernel_duration': 'totalKernelDuration(ns)', + 'avg_kernel_duration': 'avgKernelDuration(ns)'}) + elif self._export_type == Constant.EXCEL: + stat_df = stat_df.rename(columns={ + 'module_parent': 'Parent Module', + 'module': 'Module', + 'op_name': 'Op Name', + 'kernel_list': 'Kernel List', + 'op_count': 'Op Count', + 'total_kernel_duration': 'Total Kernel Duration(ns)', + 'avg_kernel_duration': 'Avg Kernel Duration(ns)'}) + except Exception as e: + logger.error(f"Failed to format statistic data's title, error message: {e}") + return pd.DataFrame() + return stat_df + diff --git a/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png b/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png new file mode 100644 index 0000000000000000000000000000000000000000..1ae07065dbd22c13d84de39bd2441820746e3048 Binary files /dev/null and b/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png differ diff --git a/profiler/msprof_analyze/docs/features/module_statistic.md b/profiler/msprof_analyze/docs/features/module_statistic.md new file mode 100644 index 0000000000000000000000000000000000000000..bb7965831973fca6392802f25f4876860eee40ac --- /dev/null +++ b/profiler/msprof_analyze/docs/features/module_statistic.md @@ -0,0 +1,171 @@ +# 模型结构拆解指南 + +## 简介 + +msprof-analyze提供了针对PyTorch模型自动解析模型层级结构的分析能力(module_statistic),帮助精准定位性能瓶颈,为模型优化提供关键洞察。该分析能力提供: + +* 模型结构拆解:自动提取并展示模型的层次化结构,以及模型中的算子调用顺序 +* 算子与Kernel映射:框架层算子下与NPU上执行Kernel的映射关系 +* 性能分析:精确统计并输出Device侧Kernel的执行耗时 + + +## 操作指导 +### 性能数据准备 +#### 1. 添加mstx打点 + +在模型代码中调用`torch_npu.npu.mstx.range_start/range_end`性能打点接口,需重写PyTorch中的nn.Module调用逻辑 + +#### 2. 配置并采集 Profiling 数据 + +* 使用`torch_npu.profiler`接口采集性能数据 +* 在`torch_npu.profiler._ExperimentalConfig`设置`msprof_tx=True`,开启打点事件采集 +* 在`torch_npu.profiler._ExperimentalConfig`设置`export_type`导出类型,需要包含DB +* 性能数据落盘在`torch_npu.profiler.tensorboard_trace_handler`接口指定的路径下,将该路径下的数据作为msprof-analyze cluster的输入数据 + +完整样例代码,详见[性能数据采集样例代码](#性能数据采集样例代码) + +### 执行分析命令 + +使用以下命令启用分析功能: +``` +msprof-analyze cluster -m module_statistic -d ./result --export_type excel +``` +参数说明: +* `-d`: 集群性能数据路径 +* `--export_type`: 导出类型设置,可选db, excel +* 其余参数:与cluster集群分析功能支持的参数一致,详见[参数列表](../../cluster_analyse/README.md) + +### 输出说明 +输出结果体现模型层级,算子调用顺序,NPU上执行的Kernel以及统计时间 +* `export_type`设置为`excel`时,每张卡生成独立的module_statistic_{rank_id}.csv文件,如下图所示: +![vllm_module_statistic](img/vllm_module_statistic.png) + +* `export_type`设置为`db`时,结果统一保存到 cluster_analysis.db 的 ModuleAnalysisStatistic,字段说明如下: + + | 字段名称 | 类型 | 说明 | + |---------------------|---------|-----------------------------------| + | parentModule | TEXT | 上层Module名称 | + | module | TEXT | 最底层Module名称 | + | opName | TEXT | 框架侧算子名称,同一module下,算子按照调用顺序排列 | + | kernelList | TEXT | 框架侧算子下发到Device侧执行Kernel的序列 | + | totalKernelDuration | REAL | 框架侧算子对应Device侧Kerenl运行总时间,单位纳秒(ns) | + | avgKernelDuration | REAL | 框架侧算子对应Device侧Kerenl平均运行时间,单位纳秒(ns) | + | opCount | INTEGER | 框架侧算子在采集周期内运行的次数 | + | rankID | INTEGER | 集群场景的节点识别ID,集群场景下设备的唯一标识 | + + + +## 附录 +### 性能数据采集样例代码 + +对于复杂模型结构,建议采用选择性打点策略以降低性能开销,核心性能打点实现代码如下: +``` +module_list = ["Attention", "QKVParallelLinear"] +def custom_call(self, *args, **kwargs): + module_name = self.__class__.__name__ + if module_name not in module_list: + return original_call(self, *args, **kwargs) + mstx_id = torch_npu.npu.mstx.range_start(module_name, domain="Module") + tmp = original_call(self, *args, **kwargs) + torch_npu.npu.mstx.range_end(mstx_id, domain="Module") + return tmp +``` +完整样例代码如下: +``` +import random +import torch +import torch_npu +import torch.nn as nn +import torch.optim as optim + + +original_call = nn.Module.__call__ + +def custom_call(self, *args, **kwargs): + """自定义nn.Module调用方法,添加MSTX打点""" + module_name = self.__class__.__name__ + mstx_id = torch_npu.npu.mstx.range_start(module_name, domain="Module") + tmp = original_call(self, *args, **kwargs) + torch_npu.npu.mstx.range_end(mstx_id, domain="Module") + return tmp + + +class RMSNorm(nn.Module): + def __init__(self, dim: int, eps: float = 1e-6): + super().__init__() + self.eps = eps + self.weight = nn.Parameter(torch.ones(dim)) + + def _norm(self, x): + return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) + + def forward(self, x): + output = self._norm(x.float()).type_as(x) + return output * self.weight + + +class ToyModel(nn.Module): + def __init__(self, D_in, H, D_out): + super(ToyModel, self).__init__() + self.input_linear = torch.nn.Linear(D_in, H) + self.middle_linear = torch.nn.Linear(H, H) + self.output_linear = torch.nn.Linear(H, D_out) + self.rms_norm = RMSNorm(D_out) + + def forward(self, x): + h_relu = self.input_linear(x).clamp(min=0) + for i in range(3): + h_relu = self.middle_linear(h_relu).clamp(min=random.random()) + y_pred = self.output_linear(h_relu) + y_pred = self.rms_norm(y_pred) + return y_pred + + +def train(): + # 替换默认调用方法 + nn.Module.__call__ = custom_call + N, D_in, H, D_out = 256, 1024, 4096, 64 + + torch.npu.set_device(6) + input_data = torch.randn(N, D_in).npu() + labels = torch.randn(N, D_out).npu() + model = ToyModel(D_in, H, D_out).npu() + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(model.parameters(), lr=0.001) + + experimental_config = torch_npu.profiler._ExperimentalConfig( + aic_metrics=torch_npu.profiler.AiCMetrics.PipeUtilization, + profiler_level=torch_npu.profiler.ProfilerLevel.Level2, + l2_cache=False, + msprof_tx=True, # 打开msprof_tx采集 + data_simplification=False, + export_type=['text', 'db'] # 导出类型中必须要包含db + ) + + prof = torch_npu.profiler.profile( + activities=[torch_npu.profiler.ProfilerActivity.CPU, torch_npu.profiler.ProfilerActivity.NPU], + schedule=torch_npu.profiler.schedule(wait=1, warmup=1, active=3, repeat=1, skip_first=5), + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler("./result"), + record_shapes=True, + profile_memory=False, + with_stack=False, + with_flops=False, + with_modules=True, + experimental_config=experimental_config) + prof.start() + + for i in range(12): + optimizer.zero_grad() + outputs = model(input_data) + loss = loss_fn(outputs, labels) + loss.backward() + optimizer.step() + prof.step() + + prof.stop() + + +if __name__ == "__main__": + train() +``` diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py index 5601a9f77fe0332d8258664a40aa449fb6aafbd7..b059611372a01b5b5c4e23a6575edc965bc76f36 100644 --- a/profiler/msprof_analyze/prof_common/constant.py +++ b/profiler/msprof_analyze/prof_common/constant.py @@ -141,9 +141,11 @@ class Constant(object): TABLE_STRING_IDS = "STRING_IDS" TABLE_TASK = "TASK" TABLE_TASK_MPU_INFO = "TASK_MPU_INFO" + TABLE_COMMUNICATION_SCHEDULE_TASK_INFO = "COMMUNICATION_SCHEDULE_TASK_INFO" # export_type NOTEBOOK = "notebook" + EXCEL = "excel" # db name DB_COMMUNICATION_ANALYZER = "analysis.db" @@ -480,3 +482,6 @@ class Constant(object): UINT32_MASK = 0xffffffff INVALID_RANK_NUM = 4294967295 + + SQL_PLACEHOLDER_PATTERN = r"\?|\%s" + diff --git a/profiler/msprof_analyze/prof_exports/base_stats_export.py b/profiler/msprof_analyze/prof_exports/base_stats_export.py index 59d58bdff5485a6ace0f2c12dadbf543ecd4b978..786e139d2c3d26efaf229b05cf0cab957906a192 100644 --- a/profiler/msprof_analyze/prof_exports/base_stats_export.py +++ b/profiler/msprof_analyze/prof_exports/base_stats_export.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +from typing import List + import pandas as pd from msprof_analyze.prof_common.db_manager import DBManager @@ -28,21 +31,34 @@ class BaseStatsExport: self._db_path = db_path self._analysis_class = analysis_class self._query = None + self._param = None self.mode = Constant.ANALYSIS def get_query(self): return self._query + def set_params(self, param: List): + if not isinstance(param, List) or not param: + logger.error("Export param type must be List and not empty") + return + self._param = param + def read_export_db(self): try: + if not self._db_path: + logger.error("db path is None.") + return None query = self.get_query() if query is None: logger.error("query is None.") return None - conn, cursor = DBManager.create_connect_db(self._db_path, self.mode) - data = pd.read_sql(query, conn) + conn, cursor = DBManager.create_connect_db(self._db_path, Constant.ANALYSIS) + if self._param is not None and re.search(Constant.SQL_PLACEHOLDER_PATTERN, query): + data = pd.read_sql(query, conn, params=self._param) + else: + data = pd.read_sql(query, conn) DBManager.destroy_db_connect(conn, cursor) return data except Exception as e: logger.error(f"File {self._db_path} read failed error: {e}") - return None \ No newline at end of file + return None diff --git a/profiler/msprof_analyze/prof_exports/module_statistic_export.py b/profiler/msprof_analyze/prof_exports/module_statistic_export.py new file mode 100644 index 0000000000000000000000000000000000000000..800a142d629cc1f5cd523e1dd6d9382bf6d15f40 --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/module_statistic_export.py @@ -0,0 +1,113 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + + +logger = get_logger() + + +QUERY_COMPUTE_TASK = """ + WITH task_connections AS ( + SELECT + str.value AS name, + task.startNs, + task.endNs, + conn.id AS api_conn_id + FROM + {compute_table} AS compute + LEFT JOIN + TASK task ON compute.globalTaskId = task.globalTaskId + LEFT JOIN + STRING_IDS str ON str.id = compute.name + LEFT JOIN + CONNECTION_IDS conn ON conn.connectionId = task.connectionId + )""" + +QUERY_COMMUNICATION_TASK = """ + WITH task_connections AS ( + SELECT + str.value AS name, + comm.startNs, + comm.endNs, + conn.id AS api_conn_id + FROM + COMMUNICATION_OP AS comm + JOIN + STRING_IDS str ON str.id = comm.opType + JOIN + CONNECTION_IDS conn ON conn.connectionId = comm.connectionId + )""" + + +QUERY_TASK_LINK_PYTORCH_API = """ + SELECT + tc.name as kernel_name, + tc.startNs as kernel_ts, + tc.endNs as kernel_end, + api_str.value AS op_name, + api.startNs as op_ts, + api.endNs as op_end + FROM + task_connections tc + JOIN + PYTORCH_API api ON tc.api_conn_id = api.connectionId + JOIN + STRING_IDS api_str ON api.name = api_str.id + ORDER BY op_ts, kernel_ts +""" + + + +QUERY_MSTX_RANGE_WITH_DOMAIN = """ + SELECT + mstx.startNs, + mstx.endNs, + str_name.value AS name + FROM + MSTX_EVENTS mstx + LEFT JOIN + STRING_IDS str_name ON mstx.message = str_name.id + LEFT JOIN + STRING_IDS str_domain ON mstx.domainId = str_domain.id + WHERE + mstx.eventType = 2 + {} + ORDER BY mstx.startNs +""" + + +class FrameworkOpToKernelExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name, table_name): + super().__init__(db_path, recipe_name) + if table_name in [Constant.TABLE_COMPUTE_TASK_INFO, Constant.TABLE_COMMUNICATION_SCHEDULE_TASK_INFO]: + self._query = (QUERY_COMPUTE_TASK + QUERY_TASK_LINK_PYTORCH_API).format(compute_table=table_name) + elif table_name == Constant.TABLE_COMMUNICATION_OP: + self._query = QUERY_COMMUNICATION_TASK + QUERY_TASK_LINK_PYTORCH_API + else: + logger.error(f"FrameworkOpToKernelExport not support {table_name}") + + +class ModuleMstxRangeExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name, domain_name=None): + super().__init__(db_path, recipe_name) + filter_statement = "" + if domain_name: + filter_statement = "AND str_domain.value = ?" + self.set_params([domain_name.replace('"', "'")]) # 用单引号替换双引号 + self._query = QUERY_MSTX_RANGE_WITH_DOMAIN.format(filter_statement) diff --git a/profiler/msprof_analyze/version.txt b/profiler/msprof_analyze/version.txt index 359a5b952d49f3592571e2af081510656029298e..da1561810140246b6a6971f317f6bbe9a2643ce2 100644 --- a/profiler/msprof_analyze/version.txt +++ b/profiler/msprof_analyze/version.txt @@ -1 +1 @@ -2.0.0 \ No newline at end of file +8.1.0 \ No newline at end of file