diff --git a/profiler/msprof_analyze/cli/cluster_cli.py b/profiler/msprof_analyze/cli/cluster_cli.py
index 0cdb2bd2b10b2ede411d10221e36a51e3f015e12..b0c9fb6ec4661bfc6432291f76eeefc07db7767d 100644
--- a/profiler/msprof_analyze/cli/cluster_cli.py
+++ b/profiler/msprof_analyze/cli/cluster_cli.py
@@ -32,7 +32,7 @@ context_settings['ignore_unknown_options'] = True
@click.option('--data_simplification', is_flag=True, help='data simplification switch for db data')
@click.option('--force', is_flag=True, help="Indicates whether to skip file size verification and owner verification")
@click.option("--parallel_mode", type=str, help="context mode", default="concurrent")
-@click.option("--export_type", help="recipe export type", type=click.Choice(["db", "notebook"]), default="db")
+@click.option("--export_type", help="recipe export type", type=click.Choice(["db", "notebook", "excel"]), default="db")
@click.option("--rank_list", type=str, help="Rank id list", default='all')
@click.argument('args', nargs=-1)
def cluster_cli(**kwargs) -> None:
diff --git a/profiler/msprof_analyze/cluster_analyse/README.md b/profiler/msprof_analyze/cluster_analyse/README.md
index e488ab85f76c40da44d188ac421abe7f3b533da1..40515ca88ae21fc03f2cac4b79ae06a2767f68d3 100644
--- a/profiler/msprof_analyze/cluster_analyse/README.md
+++ b/profiler/msprof_analyze/cluster_analyse/README.md
@@ -54,45 +54,47 @@ experimental_config = torch_npu.profiler._ExperimentalConfig(
参数说明:
- | 参数名 | 说明 | 是否必选 |
- | --------------------- | ------------------------------------------------------------ | -------- |
- | --profiling_path或-d | 性能数据汇集目录。未配置-o参数时,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 是 |
- | --output_path或-o | 自定义输出路径,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 否 |
- | --mode或-m | 数据解析模式,取值详见“**--mode参数说明**”表。 | 否 |
- | --data_simplification | 数据精简模式。对于数据量过大的性能数据db文件,可以通过配置该参数将数据精简,并提高工具分析效率。配置该参数表示开启数据精简,默认未配置表示关闭。 | 否 |
- | --force | 强制执行cluster。配置后可强制跳过如下情况:
指定的目录、文件的用户属主不属于当前用户,忽略属主判断直接执行。
csv文件大于5G、json文件大于10G、db文件大于8G,忽略文件过大判断直接执行。
配置该参数表示开启强制执行,默认未配置表示关闭。 | 否 |
- | --parallel_mode | 设置收集多卡、多节点db数据时的并发方式。取值为concurrent(使用concurrent.feature进程池实现并发)。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
- | --export_type | 设置导出的数据形式。取值为db(.db格式文件)和notebook(Jupyter Notebook文件),默认值为db。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
- | --rank_list | 对特定Rank上的数据进行统计,默认值为all(表示对所有Rank进行统计),须根据实际卡的Rank ID配置。应配置为大于等于0的整数,若所配置的值大于实际训练所运行的卡的Rank ID,则仅解析合法的RankID的数据,比如当前环境Rank ID为0到7,实际训练运行0到3卡,此时若配置Rank ID为0, 3, 4或不存在的10等其他值,则仅解析0和3。配置示例:--rank_list 0, 1, 2。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
- | --top_num | 设置TopN耗时的通信算子的数量,默认值为15,配置示例:--top_num 20。
**只有-m配置hccl_sum时可配置此参数。** | 否 |
- | --exclude_op_name | 控制compute_op_name结果是否包含op_name,示例:--exclude_op_name,后面不需要跟参数。
**只有-m配置compute_op_sum时可配置此参数。** | 否 |
- | --bp | 要对比的标杆集群数据,示例:--bp {bp_cluster_profiling_path},表示profiling_path和bp_cluster_profiling_path的数据进行对比。
**只有-m配置cluster_time_compare_summary时可配置此参数。** | 否 |
-
+ | 参数名 | 说明 | 是否必选 |
+ | -------------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -------- |
+ | --profiling_path或-d | 性能数据汇集目录。未配置-o参数时,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 是 |
+ | --output_path或-o | 自定义输出路径,运行分析脚本之后会在该目录下自动创建cluster_analysis_output文件夹,保存分析数据。 | 否 |
+ | --mode或-m | 数据解析模式,取值详见“**--mode参数说明**”表。 | 否 |
+ | --data_simplification | 数据精简模式。对于数据量过大的性能数据db文件,可以通过配置该参数将数据精简,并提高工具分析效率。配置该参数表示开启数据精简,默认未配置表示关闭。 | 否 |
+ | --force | 强制执行cluster。配置后可强制跳过如下情况:
指定的目录、文件的用户属主不属于当前用户,忽略属主判断直接执行。
csv文件大于5G、json文件大于10G、db文件大于8G,忽略文件过大判断直接执行。
配置该参数表示开启强制执行,默认未配置表示关闭。 | 否 |
+ | --parallel_mode | 设置收集多卡、多节点db数据时的并发方式。取值为concurrent(使用concurrent.feature进程池实现并发)。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
+ | --export_type | 设置导出的数据形式。取值为db(.db格式文件)、notebook(Jupyter Notebook文件)和excel(excel格式文件),默认值为db。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
+ | --rank_list | 对特定Rank上的数据进行统计,默认值为all(表示对所有Rank进行统计),须根据实际卡的Rank ID配置。应配置为大于等于0的整数,若所配置的值大于实际训练所运行的卡的Rank ID,则仅解析合法的RankID的数据,比如当前环境Rank ID为0到7,实际训练运行0到3卡,此时若配置Rank ID为0, 3, 4或不存在的10等其他值,则仅解析0和3。配置示例:--rank_list 0, 1, 2。
**只有-m配置cann_api_sum、compute_op_sum、hccl_sum、mstx_sum时可配置此参数。** | 否 |
+ | --top_num | 设置TopN耗时的通信算子的数量,默认值为15,配置示例:--top_num 20。
**只有-m配置hccl_sum时可配置此参数。** | 否 |
+ | --exclude_op_name | 控制compute_op_name结果是否包含op_name,示例:--exclude_op_name,后面不需要跟参数。
**只有-m配置compute_op_sum时可配置此参数。** | 否 |
+ | --bp | 要对比的标杆集群数据,示例:--bp {bp_cluster_profiling_path},表示profiling_path和bp_cluster_profiling_path的数据进行对比。
**只有-m配置cluster_time_compare_summary时可配置此参数。** | 否 |
+
+
--mode参数说明:
--mode参数设置不同的数据解析模式,可调用不同的分析能力,交付件详细内容请参见[recipe结果和cluster_analysis.db交付件表结构说明](#recipe结果和cluster_analysisdb交付件表结构说明)。
- | 参数名 | 说明 | 是否必选 |
- |------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----|
- | communication_matrix | 解析通信矩阵数据。 | 否 |
- | communication_time | 解析通信耗时数据。 | 否 |
- | all | 同时解析通信矩阵communication_matrix和通信耗时数据communication_time,--mode参数默认值为all。 | 否 |
- | cann_api_sum | 集群API性能数据汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/CannApiSum目录下输出交付件stats.ipynb。 | 否 |
- | compute_op_sum | 集群场景性能数据的device运行算子信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/ComputeOpSum目录下输出交付件stats.ipynb;可根据实际情况决定是否是否打开--exclude_op_name。 | 否 |
- | hccl_sum | 集合通信算子耗时分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/HcclSum目录下输出交付件stats.ipynb。 | 否 |
- | mstx_sum | 集群场景mstx打点信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/MstxSum目录下输出交付件stats.ipynb。 | 否 |
- | slow_link | 集群慢链路异常分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/SlowLink目录下输出交付件stats.ipynb。 | 否 |
- | cluster_time_summary | 集群场景性能数据分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db和analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db里面有ClusterTimeSummary,不支持导出notebook。 | 否 |
- | cluster_time_compare_summary | 集群场景性能数据对比分析,使用前集群数据必须先分析cluster_time_summary,需要配合--bp参数使用。输入性能数据需要基于cluster_analysis_output下的cluster_analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db文件中有对比结果的表ClusterTimeCompareSummary,不支持导出notebook。 | 否 |
- | slow_rank_pp_stage | 集群场景性能数据pp stage通信对比分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输入性能数据中MetaData表如果没有包含训练任务的并行策略,则需要通过--tp --pp --dp手动传入,数据类型为正整数。--export_type为db时,输出交付件cluster_analysis.db,db文件中有分析结果PPAnalysisResult和P2PAnalysisResult,不支持导出notebook。 | 否 |
- | freq_analysis | 集群场景aicore frequency信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。打印输出是否aicore存在空闲(频率为800MHz)、异常(频率不为1800MHz或800MHz)的现象。如果有,则在输出交付件cluster_analysis.db增加对应的卡和频率信息。 | 否 |
- | ep_load_balance | 集群场景moe负载信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db增加EPTokensSummary, TopEPTokensInfo分析表格。 | 否 |
- | mstx2comm | 基于ascend_pytorch_profiler_{rank_id}.db文件,将通信内置打点数据转换成通信算子。 | 否 |
- | slow_rank | 集群场景通信算子快慢卡汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db中展示各个rank按照当前的快慢卡统计算法得出的快慢卡影响次数。 | |
- | p2p_pairing | 集群场景P2P算子生成全局关联索引,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出的关联索引会作为一个新的字段`opConnectionId`附在原性能数据ascend_pytorch_profiler_{rank_id}.db文件的`COMMUNICATION_OP`的表中。 | 否 |
- | filter_db | 基于ascend_pytorch_profiler_{rank_id}.db文件,提取通信类大算子数据,计算类关键函数和框架关键函数,节约90%+ 内存,促进快速全局分析。 | 否 |
- | pp_chart | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,还原pp流水图 | 否 |
- | 自定义分析参数 | 与cann_api_sum、compute_op_sum、hccl_sum等参数功能类似,用户可自定义一套性能数据的分析规则,需要详细了解性能分析的开发人员,具体开发指导请参见“[自定义分析规则开发指导](#自定义分析规则开发指导)”。 | 否 |
+ | 参数名 | 说明 | 是否必选 | export_type支持类型 |
+ |------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----|-----------------|
+ | communication_matrix | 解析通信矩阵数据。 | 否 | / |
+ | communication_time | 解析通信耗时数据。 | 否 | / |
+ | all | 同时解析通信矩阵communication_matrix和通信耗时数据communication_time,--mode参数默认值为all。 | 否 | / |
+ | cann_api_sum | 集群API性能数据汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/CannApiSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook |
+ | compute_op_sum | 集群场景性能数据的device运行算子信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/ComputeOpSum目录下输出交付件stats.ipynb;可根据实际情况决定是否是否打开--exclude_op_name。 | 否 | db, nootebook |
+ | hccl_sum | 集合通信算子耗时分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/HcclSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook |
+ | mstx_sum | 集群场景mstx打点信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/MstxSum目录下输出交付件stats.ipynb。 | 否 | db, nootebook |
+ | slow_link | 集群慢链路异常分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。--export_type为db时,输出交付件cluster_analysis.db;--export_type为notebook时,在cluster_analysis_output/SlowLink目录下输出交付件stats.ipynb。 | 否 | db, nootebook |
+ | cluster_time_summary | 集群场景性能数据分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db和analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db里面有ClusterTimeSummary。 | 否 | db |
+ | cluster_time_compare_summary | 集群场景性能数据对比分析,使用前集群数据必须先分析cluster_time_summary,需要配合--bp参数使用。输入性能数据需要基于cluster_analysis_output下的cluster_analysis.db文件。--export_type为db时,输出交付件cluster_analysis.db,db文件中有对比结果的表ClusterTimeCompareSummary。 | 否 | db |
+ | slow_rank_pp_stage | 集群场景性能数据pp stage通信对比分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输入性能数据中MetaData表如果没有包含训练任务的并行策略,则需要通过--tp --pp --dp手动传入,数据类型为正整数。--export_type为db时,输出交付件cluster_analysis.db,db文件中有分析结果PPAnalysisResult和P2PAnalysisResult。 | 否 | db |
+ | freq_analysis | 集群场景aicore frequency信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。打印输出是否aicore存在空闲(频率为800MHz)、异常(频率不为1800MHz或800MHz)的现象。如果有,则在输出交付件cluster_analysis.db增加对应的卡和频率信息。 | 否 | db |
+ | ep_load_balance | 集群场景moe负载信息汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db增加EPTokensSummary, TopEPTokensInfo分析表格。 | 否 | db |
+ | mstx2commop | 基于ascend_pytorch_profiler_{rank_id}.db文件,将通信内置打点数据转换成通信算子。 | 否 | db |
+ | slow_rank | 集群场景通信算子快慢卡汇总分析,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出交付件cluster_analysis.db中展示各个rank按照当前的快慢卡统计算法得出的快慢卡影响次数。 | 否 | db |
+ | p2p_pairing | 集群场景P2P算子生成全局关联索引,输入性能数据需要基于ascend_pytorch_profiler_{rank_id}.db文件。输出的关联索引会作为一个新的字段`opConnectionId`附在原性能数据ascend_pytorch_profiler_{rank_id}.db文件的`COMMUNICATION_OP`的表中。 | 否 | db |
+ | filter_db | 基于ascend_pytorch_profiler_{rank_id}.db文件,提取通信类大算子数据,计算类关键函数和框架关键函数,节约90%+ 内存,促进快速全局分析。 | 否 | db |
+ | pp_chart | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,还原pp流水图 | 否 | db |
+ | module_statistic | 基于打点后的ascend_pytorch_profiler_{rank_id}.db文件,分析打点数据,拆解模型结构与算子关联,具体指导请参见“[模型结构拆解指南](../docs/features/module_statistic.md)” | 否 | db, excel |
+ | 自定义分析参数 | 与cann_api_sum、compute_op_sum、hccl_sum等参数功能类似,用户可自定义一套性能数据的分析规则,需要详细了解性能分析的开发人员,具体开发指导请参见“[自定义分析规则开发指导](#自定义分析规则开发指导)”。 | 否 | / |
--parallel_mode参数示例如下:
diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py
index 6464bb732ddf57b2790d99ac7148ce3ecaf327ce..43e716efa116b0613fcb8a14fe6555afcdbb5c46 100644
--- a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py
+++ b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py
@@ -140,7 +140,8 @@ def cluster_analysis_main():
parser.add_argument('--force', action='store_true',
help="Indicates whether to skip file size verification and owner verification")
parser.add_argument("--parallel_mode", type=str, help="context mode", default="concurrent")
- parser.add_argument("--export_type", type=str, help="recipe export type", choices=["db", "notebook"], default="db")
+ parser.add_argument("--export_type", type=str, help="recipe export type", choices=["db", "notebook", "excel"],
+ default="db")
parser.add_argument("--rank_list", type=str, help="Rank id list", default='all')
args, extra_args = parser.parse_known_args()
diff --git a/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py b/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ca5308b5963ab9e408adf433711028c06a5fc4e
--- /dev/null
+++ b/profiler/msprof_analyze/cluster_analyse/common_func/excel_utils.py
@@ -0,0 +1,175 @@
+# Copyright (c) 2025, Huawei Technologies Co., Ltd.
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Dict, Optional
+
+import pandas as pd
+
+from msprof_analyze.prof_common.logger import get_logger
+
+logger = get_logger()
+
+
+class ExcelUtils:
+ DEFAULT_FORMAT = {
+ 'valign': 'vcenter',
+ 'border': 1,
+ 'font_name': 'Times New Roman'
+ }
+ DEFAULT_HEADER_FORMAT = {
+ 'valign': 'vcenter',
+ 'bold': True,
+ 'border': 1,
+ 'bg_color': '#AFEEEE',
+ 'font_name': 'Times New Roman'
+ }
+
+ def __init__(self):
+ self.workbook = None
+ self.writer = None
+ self.worksheet = None
+ self.df = None
+ self._formats_cache = {}
+
+ def clear(self) -> None:
+ """显式清理资源,允许实例重复使用"""
+ if self.writer:
+ self.writer.close()
+ self.workbook = None
+ self.writer = None
+ self.worksheet = None
+ self.df = None
+ self._formats_cache = {}
+
+ def create_excel_writer(self, output_file: str,
+ df: pd.DataFrame,
+ column_format: Optional[Dict] = None,
+ header_format: Optional[Dict] = None,
+ sheet_name: str = 'Sheet1'):
+ """初始化Excel写入器并写入原始数据"""
+ self.writer = pd.ExcelWriter(output_file, engine='xlsxwriter')
+ self.workbook = self.writer.book
+ self.worksheet = self.workbook.add_worksheet(sheet_name)
+ self.df = df
+
+ # 写入标题行
+ header_fmt = self._get_format(header_format if header_format else self.DEFAULT_HEADER_FORMAT)
+ for col_idx, col_name in enumerate(df.columns):
+ self.worksheet.write(0, col_idx, col_name, header_fmt)
+
+ # 写入数据行
+ default_fmt = self._get_format(column_format if column_format else self.DEFAULT_FORMAT)
+ for row_idx, row in df.iterrows():
+ for col_idx, col_name in enumerate(df.columns):
+ self.worksheet.write(row_idx + 1, col_idx, row[col_name], default_fmt)
+
+ def save_and_close(self):
+ """保存并关闭 Excel 文件"""
+ if self.writer:
+ self.writer.close()
+ self.writer = None
+ self.workbook = None
+ self.worksheet = None
+
+ def set_column_width(self, columns_config: Dict[str, int]):
+ """设置列宽"""
+ if not self.worksheet:
+ raise Exception("Worksheet has not been initialized!")
+
+ for col, width in columns_config.items():
+ col_idx = list(columns_config.keys()).index(col)
+ self.worksheet.set_column(col_idx, col_idx, width)
+
+ def set_row_height(self, row: int, height: int):
+ """设置指定行的行高"""
+ if not self.worksheet:
+ raise Exception("Worksheet not initialized")
+ self.worksheet.set_row(row, height)
+
+ def freeze_panes(self, row: int = 1, col: int = 0):
+ """冻结窗格"""
+ if not self.worksheet:
+ raise Exception("Worksheet has not been initialized!")
+ self.worksheet.freeze_panes(row, col)
+
+ def merge_duplicate_cells(
+ self,
+ columns_to_merge: List[str],
+ merge_format: Optional[Dict] = None,
+ header_format: Optional[Dict] = None
+ ):
+ """
+ 合并连续相同值的单元格
+
+ 参数:
+ df: 输入的 DataFrame
+ columns_to_merge: 需要合并的列名列表
+ merge_format: 合并单元格的格式字典
+ header_format: 标题行的格式字典
+ """
+ if not self.workbook or not self.worksheet:
+ raise Exception("Worksheet has not been initialized!")
+
+ # 设置格式
+ merge_fmt = self._get_format(merge_format if merge_format else self.DEFAULT_FORMAT)
+ header_fmt = self._get_format(header_format if header_format else self.DEFAULT_HEADER_FORMAT)
+
+ # 应用标题行格式
+ for col_num, value in enumerate(self.df.columns.values):
+ self.worksheet.write(0, col_num, value, header_fmt)
+
+ # 遍历需要合并的列
+ for col in columns_to_merge:
+ if col not in self.df.columns:
+ logger.warning(f"Invalid column: {col}, not in dataframe!")
+ continue
+
+ col_idx = self.df.columns.get_loc(col)
+ current_value = None
+ start_row = 1 # 第一行是标题,从第二行开始
+ merge_count = 0
+
+ for i in range(len(self.df)):
+ excel_row = i + 1
+
+ if self.df[col].iloc[i] == current_value:
+ continue
+ else:
+ if current_value is not None and (excel_row - 1) > start_row:
+ self.worksheet.merge_range(
+ start_row, col_idx, excel_row - 1, col_idx,
+ current_value,
+ merge_fmt
+ )
+ merge_count += 1
+
+ current_value = self.df[col].iloc[i]
+ start_row = excel_row
+
+ # 处理最后一组连续相同的值
+ if current_value is not None and (len(self.df)) > start_row:
+ self.worksheet.merge_range(
+ start_row, col_idx, len(self.df), col_idx,
+ current_value,
+ merge_fmt
+ )
+ merge_count += 1
+
+ def _get_format(self, format_dict: Dict):
+ """获取或创建格式对象(带缓存)"""
+ format_key = frozenset(format_dict.items())
+ if format_key not in self._formats_cache:
+ self._formats_cache[format_key] = self.workbook.add_format(format_dict)
+ return self._formats_cache[format_key]
\ No newline at end of file
diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py
new file mode 100644
index 0000000000000000000000000000000000000000..91a354f81ae0838e8e96688fa47e2a8e45057060
--- /dev/null
+++ b/profiler/msprof_analyze/cluster_analyse/recipes/module_statistic/module_statistic.py
@@ -0,0 +1,347 @@
+# Copyright (c) 2025, Huawei Technologies Co., Ltd.
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from collections import defaultdict, deque
+
+import pandas as pd
+
+from msprof_analyze.cluster_analyse.common_func.excel_utils import ExcelUtils
+from msprof_analyze.prof_common.db_manager import DBManager
+from msprof_analyze.prof_exports.module_statistic_export import FrameworkOpToKernelExport, ModuleMstxRangeExport
+from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis
+from msprof_analyze.prof_common.constant import Constant
+from msprof_analyze.prof_common.logger import get_logger
+
+logger = get_logger()
+
+
+class NodeType:
+ MODULE_EVENT_NODE = 0
+ CPU_OP_EVENT = 1
+ KERNEL_EVENT = 2
+
+
+class TreeNode:
+ def __init__(self, start, end, node_type, name):
+ self.start = start
+ self.end = end
+ self.node_type = node_type
+ self.name = name
+ self.children = []
+
+ def add_child(self, node):
+ self.children.append(node)
+
+
+class ModuleStatistic(BaseRecipeAnalysis):
+ TABLE_MODULE_STATISTIC = "ModuleStatistic"
+ KERNEL_RELATED_TABLE_LIST = [Constant.TABLE_COMPUTE_TASK_INFO, Constant.TABLE_COMMUNICATION_OP,
+ Constant.TABLE_COMMUNICATION_SCHEDULE_TASK_INFO]
+ MODULE_DOMAIN_NAME = 'Module'
+ MAX_TRAVERSE_DEPTH = 20
+
+ EXCEL_COLUMN_WIDTH_CONFIG = {"Parent Module": 40,
+ "Module": 40,
+ "Op Name": 40,
+ "Kernel List": 60,
+ "Total Kernel Duration(ns)": 15,
+ "Avg Kernel Duration(ns)": 15,
+ "Op Count": 15}
+ EXCEL_COLUMN_ALIGN_CONFIG = {"Total Kernel Duration(ns)": 'center',
+ "Avg Kernel Duration(ns)": 'center',
+ "Op Count": 'center'}
+
+ def __init__(self, params):
+ super().__init__(params)
+
+ @property
+ def base_dir(self):
+ return os.path.basename(os.path.dirname(__file__))
+
+ def run(self, context):
+ if self._export_type != Constant.DB and self._export_type != Constant.EXCEL:
+ logger.error(f"Invalid export type: {self._export_type} for module analysis, "
+ f"required to be {Constant.DB} or {Constant.EXCEL}")
+ mapper_res = self.mapper_func(context)
+ if self._export_type == Constant.DB:
+ total_df = self.reducer_func(mapper_res)
+ self.save_db(total_df)
+ elif self._export_type == Constant.EXCEL:
+ self.save_csv(mapper_res)
+
+ def reducer_func(self, mapper_res):
+ valid_dfs = [stat_df.assign(rankID=rank_id)
+ for rank_id, stat_df in mapper_res
+ if not stat_df.empty]
+ return pd.concat(valid_dfs, ignore_index=True) if valid_dfs else None
+
+ def save_db(self, df):
+ if df is None or df.empty:
+ logger.warning(f"No module analysis result, skipping dump data")
+ return
+ self.dump_data(df, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER,
+ self.TABLE_MODULE_STATISTIC, index=False)
+
+ def save_csv(self, mapper_res):
+ columns_to_merge = ['Parent Module', 'Module']
+ column_width_config = {"Parent Module": 40,
+ "Module": 40,
+ "Op Name": 40,
+ "Kernel List": 50,
+ "Total Kernel Duration(ns)": 10,
+ "Avg Kernel Duration(ns)": 10,
+ "Op Count": 10}
+ excel_utils = ExcelUtils()
+ for rank_id, stat_df in mapper_res:
+ if stat_df.empty:
+ logger.warning(f"No module analysis result for rank {rank_id}, skipping dump data")
+ continue
+ file_name = f"module_statistic_{rank_id}.xlsx"
+ file_path = os.path.join(self.output_path, file_name)
+ try:
+ excel_utils.create_excel_writer(file_path, stat_df)
+ excel_utils.merge_duplicate_cells(columns_to_merge)
+ excel_utils.set_column_width(column_width_config)
+ excel_utils.set_row_height(0, 27) # 标题行行高27
+ excel_utils.save_and_close()
+ excel_utils.clear()
+ except Exception as e:
+ logger.error(f"Save excel failed, err: {e}")
+
+ def _mapper_func(self, data_map, analysis_class):
+ """
+
+ kernel_df = pd.DataFrame()
+ """
+ # Query Data
+ profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH)
+ rank_id = data_map.get(Constant.RANK_ID)
+ module_export = ModuleMstxRangeExport(profiler_db_path, self._recipe_name, domain_name=self.MODULE_DOMAIN_NAME)
+ module_df = module_export.read_export_db() # module_df columns:["startNs", "endNs", "name"]
+ if module_df is None or module_df.empty:
+ logger.error(f"Can not export mstx range event with domain: {self.MODULE_DOMAIN_NAME}, from rank {rank_id}")
+ return rank_id, pd.DataFrame()
+ # kernel_df columns:["kernel_name", "kernel_ts", "kernel_end", "op_name", "op_ts", "op_end"]
+ kernel_df = self._query_framework_op_to_kernel(profiler_db_path)
+ if kernel_df is None or kernel_df.empty:
+ logger.error(f"Can not export framework op to kernel mapper from rank {rank_id}")
+ return rank_id, pd.DataFrame()
+ # Convert time related columns' type to be int
+ mstx_time_columns = ['startNs', 'endNs']
+ module_df[mstx_time_columns] = module_df[mstx_time_columns].astype(int)
+ kernel_time_columns = ['kernel_ts', 'kernel_end', 'op_ts', 'op_end']
+ kernel_df[kernel_time_columns] = kernel_df[kernel_time_columns].astype(int)
+
+ # Build Tree
+ root = self._build_node_tree(module_df, kernel_df)
+ if not root:
+ logger.error(f"Empty event tree for rank {rank_id}")
+ return rank_id, pd.DataFrame()
+ # Convert to Dataframe
+ verbose_df = self._flatten_tree_to_dataframe(root)
+ if verbose_df.empty:
+ logger.error(f"Failed to extract event tree data for rank {rank_id}")
+ return rank_id, pd.DataFrame()
+ # 区分module与算子序列,并计算device_time平均值
+ stat_df = self._aggregate_module_operator_stats(verbose_df.copy())
+ return rank_id, stat_df
+
+ def _query_framework_op_to_kernel(self, profiler_db_path):
+ valid_dfs = []
+ for table_name in self.KERNEL_RELATED_TABLE_LIST:
+ if not DBManager.check_tables_in_db(profiler_db_path, table_name):
+ continue
+ export = FrameworkOpToKernelExport(profiler_db_path, self._recipe_name, table_name)
+ df = export.read_export_db()
+ if df is not None and not df.empty:
+ valid_dfs.append(df)
+ if not valid_dfs:
+ return None
+ try:
+ return pd.concat(valid_dfs, ignore_index=True)
+ except Exception as e:
+ logger.error(f"Failed to concatenate framework op to kernel dataframes: {str(e)}")
+ return None
+
+ def _build_node_tree(self, module_df, kernel_df):
+ nodes = []
+
+ # 1. 创建mstx节点
+ for _, row in module_df.iterrows():
+ nodes.append(TreeNode(
+ row['startNs'],
+ row['endNs'],
+ NodeType.MODULE_EVENT_NODE,
+ row['name']
+ ))
+
+ # 2. 按op_name分组处理kernel数据
+ op_groups = defaultdict(list)
+ for _, row in kernel_df.iterrows():
+ op_groups[(row['op_name'], row['op_ts'], row['op_end'])].append(row)
+
+ # 3. 创建op节点和对应的kernel节点
+ for (op_name, op_ts, op_end), kernels in op_groups.items():
+ # 创建op节点
+ op_node = TreeNode(
+ op_ts,
+ op_end,
+ NodeType.CPU_OP_EVENT,
+ op_name
+ )
+
+ # 为每个op添加对应的kernel节点
+ for kernel in kernels:
+ kernel_node = TreeNode(
+ kernel['kernel_ts'],
+ kernel['kernel_end'],
+ NodeType.KERNEL_EVENT,
+ kernel['kernel_name']
+ )
+ op_node.add_child(kernel_node)
+
+ nodes.append(op_node)
+
+ if not nodes:
+ logger.error(f"Empty node (module_event/cpu_op/kernel), skipping tree build")
+ return None
+
+ # 4. 按开始时间升序、结束时间降序排序
+ nodes.sort(key=lambda x: (x.start, -x.end))
+
+ # 5. 构建树结构
+ root = self._create_root_node(module_df, kernel_df)
+ stack = [root]
+
+ for node in nodes:
+ # 找到最近的能包含当前节点的父节点
+ while stack[-1].start > node.start or stack[-1].end < node.end:
+ stack.pop()
+
+ # 添加到父节点的children中
+ stack[-1].add_child(node)
+ stack.append(node)
+
+ return root
+
+ def _create_root_node(self, module_df, kernel_df):
+ global_start = min(module_df['startNs'].min(), kernel_df['kernel_ts'].min(), kernel_df['op_ts'].min())
+ global_end = max(module_df['endNs'].max(), kernel_df['kernel_end'].max(), kernel_df['op_end'].max())
+ root = TreeNode(global_start, global_end, NodeType.MODULE_EVENT_NODE, "")
+ return root
+
+ def _flatten_tree_to_dataframe(self, root_node):
+ results = []
+
+ def traverse(node, module_deque, depth=0):
+ if depth > self.MAX_TRAVERSE_DEPTH:
+ logger.warning(f"Max traversal depth {self.MAX_TRAVERSE_DEPTH} reached, traversal stopped. "
+ f"Some information may be lost")
+ return
+ if node.node_type != NodeType.MODULE_EVENT_NODE:
+ return
+ for op_child in node.children:
+ if op_child.node_type == NodeType.MODULE_EVENT_NODE:
+ module_deque.append(node.name)
+ traverse(op_child, module_deque, depth + 1)
+ module_deque.pop()
+ if op_child.node_type == NodeType.CPU_OP_EVENT:
+ module = node.name
+ module_parent = "/".join(module_deque)
+ # 跳过没有module归属的op
+ if not module and not module_parent:
+ continue
+ # 收集该op下的所有kernel信息
+ kernel_names = []
+ total_device_time = 0.0
+ for kernel_child in op_child.children:
+ if kernel_child.node_type == NodeType.KERNEL_EVENT:
+ kernel_names.append(kernel_child.name)
+ duration = kernel_child.end - kernel_child.start
+ total_device_time += duration
+ results.append({
+ 'module_parent': module_parent,
+ 'module': module,
+ 'module_start': node.start,
+ 'module_end': node.end,
+ 'op_name': op_child.name,
+ 'op_start': op_child.start,
+ 'op_end': op_child.end,
+ 'kernel_list': ', '.join(kernel_names),
+ 'device_time': total_device_time
+ })
+
+ traverse(root_node, deque(), 0)
+ if not results:
+ return pd.DataFrame()
+ # 转换为DataFrame并排序
+ df = pd.DataFrame(results)
+ df = df.sort_values(by=['module_start', 'op_start'], ascending=[True, True])
+ return df
+
+ def _aggregate_module_operator_stats(self, df):
+ if df is None or df.empty:
+ logger.warning("Empty dataframe received for aggregation")
+ return pd.DataFrame()
+
+ # 为每个算子添加在module下的顺序位置
+ distinct_module_columns = ['module_parent', 'module', 'module_start', 'module_end']
+ df['op_order'] = df.groupby(distinct_module_columns).cumcount()
+ # 创建seq_key保证唯一性,并分配ID
+ op_seq = df.groupby(distinct_module_columns)['op_name'].transform(lambda x: '/'.join(x))
+ df['seq_key'] = df['module_parent'] + "|" + df['module'] + "|" + op_seq
+ df['seq_id'] = pd.factorize(op_seq)[0]
+ df.drop(columns=['seq_key'], inplace=True)
+
+ stat_df = (
+ df.groupby(['module_parent', 'module', 'op_name', 'op_order', 'kernel_list', 'seq_id'])
+ .agg(
+ module_start=('module_start', 'first'), # 取第一个 module_start
+ module_end=('module_end', 'first'), # 取第一个 module_end
+ total_kernel_duration=('device_time', 'sum'), # 计算 device_time 的总时间
+ avg_kernel_duration=('device_time', 'mean'), # 计算 device_time 的平均时间
+ op_count=('device_time', 'count') # 计算每组的行数(计数)
+ ).reset_index()
+ )
+ stat_df = (stat_df.sort_values(by=['module_start', 'op_order']).
+ drop(columns=['module_start', 'module_end', 'seq_id', 'op_order']).reset_index(drop=True))
+
+ return self._format_stat_df_columns(stat_df)
+
+ def _format_stat_df_columns(self, stat_df):
+ try:
+ if self._export_type == Constant.DB:
+ stat_df = stat_df.rename(columns={
+ 'module_parent': 'parentModule',
+ 'op_name': 'opName',
+ 'kernel_list': 'kernelList',
+ 'op_count': 'opCount',
+ 'total_kernel_duration': 'totalKernelDuration(ns)',
+ 'avg_kernel_duration': 'avgKernelDuration(ns)'})
+ elif self._export_type == Constant.EXCEL:
+ stat_df = stat_df.rename(columns={
+ 'module_parent': 'Parent Module',
+ 'module': 'Module',
+ 'op_name': 'Op Name',
+ 'kernel_list': 'Kernel List',
+ 'op_count': 'Op Count',
+ 'total_kernel_duration': 'Total Kernel Duration(ns)',
+ 'avg_kernel_duration': 'Avg Kernel Duration(ns)'})
+ except Exception as e:
+ logger.error(f"Failed to format statistic data's title, error message: {e}")
+ return pd.DataFrame()
+ return stat_df
+
diff --git a/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png b/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png
new file mode 100644
index 0000000000000000000000000000000000000000..1ae07065dbd22c13d84de39bd2441820746e3048
Binary files /dev/null and b/profiler/msprof_analyze/docs/features/img/vllm_module_statistic.png differ
diff --git a/profiler/msprof_analyze/docs/features/module_statistic.md b/profiler/msprof_analyze/docs/features/module_statistic.md
new file mode 100644
index 0000000000000000000000000000000000000000..bb7965831973fca6392802f25f4876860eee40ac
--- /dev/null
+++ b/profiler/msprof_analyze/docs/features/module_statistic.md
@@ -0,0 +1,171 @@
+# 模型结构拆解指南
+
+## 简介
+
+msprof-analyze提供了针对PyTorch模型自动解析模型层级结构的分析能力(module_statistic),帮助精准定位性能瓶颈,为模型优化提供关键洞察。该分析能力提供:
+
+* 模型结构拆解:自动提取并展示模型的层次化结构,以及模型中的算子调用顺序
+* 算子与Kernel映射:框架层算子下与NPU上执行Kernel的映射关系
+* 性能分析:精确统计并输出Device侧Kernel的执行耗时
+
+
+## 操作指导
+### 性能数据准备
+#### 1. 添加mstx打点
+
+在模型代码中调用`torch_npu.npu.mstx.range_start/range_end`性能打点接口,需重写PyTorch中的nn.Module调用逻辑
+
+#### 2. 配置并采集 Profiling 数据
+
+* 使用`torch_npu.profiler`接口采集性能数据
+* 在`torch_npu.profiler._ExperimentalConfig`设置`msprof_tx=True`,开启打点事件采集
+* 在`torch_npu.profiler._ExperimentalConfig`设置`export_type`导出类型,需要包含DB
+* 性能数据落盘在`torch_npu.profiler.tensorboard_trace_handler`接口指定的路径下,将该路径下的数据作为msprof-analyze cluster的输入数据
+
+完整样例代码,详见[性能数据采集样例代码](#性能数据采集样例代码)
+
+### 执行分析命令
+
+使用以下命令启用分析功能:
+```
+msprof-analyze cluster -m module_statistic -d ./result --export_type excel
+```
+参数说明:
+* `-d`: 集群性能数据路径
+* `--export_type`: 导出类型设置,可选db, excel
+* 其余参数:与cluster集群分析功能支持的参数一致,详见[参数列表](../../cluster_analyse/README.md)
+
+### 输出说明
+输出结果体现模型层级,算子调用顺序,NPU上执行的Kernel以及统计时间
+* `export_type`设置为`excel`时,每张卡生成独立的module_statistic_{rank_id}.csv文件,如下图所示:
+
+
+* `export_type`设置为`db`时,结果统一保存到 cluster_analysis.db 的 ModuleAnalysisStatistic,字段说明如下:
+
+ | 字段名称 | 类型 | 说明 |
+ |---------------------|---------|-----------------------------------|
+ | parentModule | TEXT | 上层Module名称 |
+ | module | TEXT | 最底层Module名称 |
+ | opName | TEXT | 框架侧算子名称,同一module下,算子按照调用顺序排列 |
+ | kernelList | TEXT | 框架侧算子下发到Device侧执行Kernel的序列 |
+ | totalKernelDuration | REAL | 框架侧算子对应Device侧Kerenl运行总时间,单位纳秒(ns) |
+ | avgKernelDuration | REAL | 框架侧算子对应Device侧Kerenl平均运行时间,单位纳秒(ns) |
+ | opCount | INTEGER | 框架侧算子在采集周期内运行的次数 |
+ | rankID | INTEGER | 集群场景的节点识别ID,集群场景下设备的唯一标识 |
+
+
+
+## 附录
+### 性能数据采集样例代码
+
+对于复杂模型结构,建议采用选择性打点策略以降低性能开销,核心性能打点实现代码如下:
+```
+module_list = ["Attention", "QKVParallelLinear"]
+def custom_call(self, *args, **kwargs):
+ module_name = self.__class__.__name__
+ if module_name not in module_list:
+ return original_call(self, *args, **kwargs)
+ mstx_id = torch_npu.npu.mstx.range_start(module_name, domain="Module")
+ tmp = original_call(self, *args, **kwargs)
+ torch_npu.npu.mstx.range_end(mstx_id, domain="Module")
+ return tmp
+```
+完整样例代码如下:
+```
+import random
+import torch
+import torch_npu
+import torch.nn as nn
+import torch.optim as optim
+
+
+original_call = nn.Module.__call__
+
+def custom_call(self, *args, **kwargs):
+ """自定义nn.Module调用方法,添加MSTX打点"""
+ module_name = self.__class__.__name__
+ mstx_id = torch_npu.npu.mstx.range_start(module_name, domain="Module")
+ tmp = original_call(self, *args, **kwargs)
+ torch_npu.npu.mstx.range_end(mstx_id, domain="Module")
+ return tmp
+
+
+class RMSNorm(nn.Module):
+ def __init__(self, dim: int, eps: float = 1e-6):
+ super().__init__()
+ self.eps = eps
+ self.weight = nn.Parameter(torch.ones(dim))
+
+ def _norm(self, x):
+ return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
+
+ def forward(self, x):
+ output = self._norm(x.float()).type_as(x)
+ return output * self.weight
+
+
+class ToyModel(nn.Module):
+ def __init__(self, D_in, H, D_out):
+ super(ToyModel, self).__init__()
+ self.input_linear = torch.nn.Linear(D_in, H)
+ self.middle_linear = torch.nn.Linear(H, H)
+ self.output_linear = torch.nn.Linear(H, D_out)
+ self.rms_norm = RMSNorm(D_out)
+
+ def forward(self, x):
+ h_relu = self.input_linear(x).clamp(min=0)
+ for i in range(3):
+ h_relu = self.middle_linear(h_relu).clamp(min=random.random())
+ y_pred = self.output_linear(h_relu)
+ y_pred = self.rms_norm(y_pred)
+ return y_pred
+
+
+def train():
+ # 替换默认调用方法
+ nn.Module.__call__ = custom_call
+ N, D_in, H, D_out = 256, 1024, 4096, 64
+
+ torch.npu.set_device(6)
+ input_data = torch.randn(N, D_in).npu()
+ labels = torch.randn(N, D_out).npu()
+ model = ToyModel(D_in, H, D_out).npu()
+
+ loss_fn = nn.MSELoss()
+ optimizer = optim.SGD(model.parameters(), lr=0.001)
+
+ experimental_config = torch_npu.profiler._ExperimentalConfig(
+ aic_metrics=torch_npu.profiler.AiCMetrics.PipeUtilization,
+ profiler_level=torch_npu.profiler.ProfilerLevel.Level2,
+ l2_cache=False,
+ msprof_tx=True, # 打开msprof_tx采集
+ data_simplification=False,
+ export_type=['text', 'db'] # 导出类型中必须要包含db
+ )
+
+ prof = torch_npu.profiler.profile(
+ activities=[torch_npu.profiler.ProfilerActivity.CPU, torch_npu.profiler.ProfilerActivity.NPU],
+ schedule=torch_npu.profiler.schedule(wait=1, warmup=1, active=3, repeat=1, skip_first=5),
+ on_trace_ready=torch_npu.profiler.tensorboard_trace_handler("./result"),
+ record_shapes=True,
+ profile_memory=False,
+ with_stack=False,
+ with_flops=False,
+ with_modules=True,
+ experimental_config=experimental_config)
+ prof.start()
+
+ for i in range(12):
+ optimizer.zero_grad()
+ outputs = model(input_data)
+ loss = loss_fn(outputs, labels)
+ loss.backward()
+ optimizer.step()
+ prof.step()
+
+ prof.stop()
+
+
+if __name__ == "__main__":
+ train()
+```
diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py
index 5601a9f77fe0332d8258664a40aa449fb6aafbd7..b059611372a01b5b5c4e23a6575edc965bc76f36 100644
--- a/profiler/msprof_analyze/prof_common/constant.py
+++ b/profiler/msprof_analyze/prof_common/constant.py
@@ -141,9 +141,11 @@ class Constant(object):
TABLE_STRING_IDS = "STRING_IDS"
TABLE_TASK = "TASK"
TABLE_TASK_MPU_INFO = "TASK_MPU_INFO"
+ TABLE_COMMUNICATION_SCHEDULE_TASK_INFO = "COMMUNICATION_SCHEDULE_TASK_INFO"
# export_type
NOTEBOOK = "notebook"
+ EXCEL = "excel"
# db name
DB_COMMUNICATION_ANALYZER = "analysis.db"
@@ -480,3 +482,6 @@ class Constant(object):
UINT32_MASK = 0xffffffff
INVALID_RANK_NUM = 4294967295
+
+ SQL_PLACEHOLDER_PATTERN = r"\?|\%s"
+
diff --git a/profiler/msprof_analyze/prof_exports/base_stats_export.py b/profiler/msprof_analyze/prof_exports/base_stats_export.py
index 59d58bdff5485a6ace0f2c12dadbf543ecd4b978..786e139d2c3d26efaf229b05cf0cab957906a192 100644
--- a/profiler/msprof_analyze/prof_exports/base_stats_export.py
+++ b/profiler/msprof_analyze/prof_exports/base_stats_export.py
@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
+from typing import List
+
import pandas as pd
from msprof_analyze.prof_common.db_manager import DBManager
@@ -28,21 +31,34 @@ class BaseStatsExport:
self._db_path = db_path
self._analysis_class = analysis_class
self._query = None
+ self._param = None
self.mode = Constant.ANALYSIS
def get_query(self):
return self._query
+ def set_params(self, param: List):
+ if not isinstance(param, List) or not param:
+ logger.error("Export param type must be List and not empty")
+ return
+ self._param = param
+
def read_export_db(self):
try:
+ if not self._db_path:
+ logger.error("db path is None.")
+ return None
query = self.get_query()
if query is None:
logger.error("query is None.")
return None
- conn, cursor = DBManager.create_connect_db(self._db_path, self.mode)
- data = pd.read_sql(query, conn)
+ conn, cursor = DBManager.create_connect_db(self._db_path, Constant.ANALYSIS)
+ if self._param is not None and re.search(Constant.SQL_PLACEHOLDER_PATTERN, query):
+ data = pd.read_sql(query, conn, params=self._param)
+ else:
+ data = pd.read_sql(query, conn)
DBManager.destroy_db_connect(conn, cursor)
return data
except Exception as e:
logger.error(f"File {self._db_path} read failed error: {e}")
- return None
\ No newline at end of file
+ return None
diff --git a/profiler/msprof_analyze/prof_exports/module_statistic_export.py b/profiler/msprof_analyze/prof_exports/module_statistic_export.py
new file mode 100644
index 0000000000000000000000000000000000000000..800a142d629cc1f5cd523e1dd6d9382bf6d15f40
--- /dev/null
+++ b/profiler/msprof_analyze/prof_exports/module_statistic_export.py
@@ -0,0 +1,113 @@
+# Copyright (c) 2025, Huawei Technologies Co., Ltd.
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from msprof_analyze.prof_common.constant import Constant
+from msprof_analyze.prof_common.logger import get_logger
+from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport
+
+
+logger = get_logger()
+
+
+QUERY_COMPUTE_TASK = """
+ WITH task_connections AS (
+ SELECT
+ str.value AS name,
+ task.startNs,
+ task.endNs,
+ conn.id AS api_conn_id
+ FROM
+ {compute_table} AS compute
+ LEFT JOIN
+ TASK task ON compute.globalTaskId = task.globalTaskId
+ LEFT JOIN
+ STRING_IDS str ON str.id = compute.name
+ LEFT JOIN
+ CONNECTION_IDS conn ON conn.connectionId = task.connectionId
+ )"""
+
+QUERY_COMMUNICATION_TASK = """
+ WITH task_connections AS (
+ SELECT
+ str.value AS name,
+ comm.startNs,
+ comm.endNs,
+ conn.id AS api_conn_id
+ FROM
+ COMMUNICATION_OP AS comm
+ JOIN
+ STRING_IDS str ON str.id = comm.opType
+ JOIN
+ CONNECTION_IDS conn ON conn.connectionId = comm.connectionId
+ )"""
+
+
+QUERY_TASK_LINK_PYTORCH_API = """
+ SELECT
+ tc.name as kernel_name,
+ tc.startNs as kernel_ts,
+ tc.endNs as kernel_end,
+ api_str.value AS op_name,
+ api.startNs as op_ts,
+ api.endNs as op_end
+ FROM
+ task_connections tc
+ JOIN
+ PYTORCH_API api ON tc.api_conn_id = api.connectionId
+ JOIN
+ STRING_IDS api_str ON api.name = api_str.id
+ ORDER BY op_ts, kernel_ts
+"""
+
+
+
+QUERY_MSTX_RANGE_WITH_DOMAIN = """
+ SELECT
+ mstx.startNs,
+ mstx.endNs,
+ str_name.value AS name
+ FROM
+ MSTX_EVENTS mstx
+ LEFT JOIN
+ STRING_IDS str_name ON mstx.message = str_name.id
+ LEFT JOIN
+ STRING_IDS str_domain ON mstx.domainId = str_domain.id
+ WHERE
+ mstx.eventType = 2
+ {}
+ ORDER BY mstx.startNs
+"""
+
+
+class FrameworkOpToKernelExport(BaseStatsExport):
+
+ def __init__(self, db_path, recipe_name, table_name):
+ super().__init__(db_path, recipe_name)
+ if table_name in [Constant.TABLE_COMPUTE_TASK_INFO, Constant.TABLE_COMMUNICATION_SCHEDULE_TASK_INFO]:
+ self._query = (QUERY_COMPUTE_TASK + QUERY_TASK_LINK_PYTORCH_API).format(compute_table=table_name)
+ elif table_name == Constant.TABLE_COMMUNICATION_OP:
+ self._query = QUERY_COMMUNICATION_TASK + QUERY_TASK_LINK_PYTORCH_API
+ else:
+ logger.error(f"FrameworkOpToKernelExport not support {table_name}")
+
+
+class ModuleMstxRangeExport(BaseStatsExport):
+
+ def __init__(self, db_path, recipe_name, domain_name=None):
+ super().__init__(db_path, recipe_name)
+ filter_statement = ""
+ if domain_name:
+ filter_statement = "AND str_domain.value = ?"
+ self.set_params([domain_name.replace('"', "'")]) # 用单引号替换双引号
+ self._query = QUERY_MSTX_RANGE_WITH_DOMAIN.format(filter_statement)
diff --git a/profiler/msprof_analyze/version.txt b/profiler/msprof_analyze/version.txt
index 359a5b952d49f3592571e2af081510656029298e..da1561810140246b6a6971f317f6bbe9a2643ce2 100644
--- a/profiler/msprof_analyze/version.txt
+++ b/profiler/msprof_analyze/version.txt
@@ -1 +1 @@
-2.0.0
\ No newline at end of file
+8.1.0
\ No newline at end of file