diff --git a/profiler/advisor/analyzer/analyzer_controller.py b/profiler/advisor/analyzer/analyzer_controller.py
new file mode 100644
index 0000000000000000000000000000000000000000..a93c8e0acd9c499ccfff76dc0c9518a7e22af93b
--- /dev/null
+++ b/profiler/advisor/analyzer/analyzer_controller.py
@@ -0,0 +1,381 @@
+import copy
+import logging
+import json
+import sys
+import os
+from pathlib import Path
+
+sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "compare_tools"))
+sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "cluster_analyse"))
+
+from profiler.advisor.analyzer.cluster.slow_rank_analyzer import SlowRankAnalyzer
+from profiler.advisor.analyzer.cluster.slow_link_analyzer import SlowLinkAnalyzer
+from profiler.advisor.analyzer.overall.overall_summary_analyzer import OverallSummaryAnalyzer
+from profiler.advisor.common import constant as const
+from profiler.advisor.common.analyzer_scopes import SupportedScopes
+from profiler.advisor.interface.interface import Interface
+from profiler.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor
+
+logger = logging.getLogger()
+
+
+class AnalyzerController:
+ OBS_PREFIX = "obs://"
+ CLUSTER_RANK_THRESHOLD = 2
+
+ def __init__(self, dimensions, **kwargs):
+ self.dimensions = dimensions
+ self.kwargs = kwargs
+ self.slow_rank_analyzer = None
+ self.slow_link_analyzer = None
+ self.cluster_local_data_map = {}
+ self.default_rank_id = None
+ self.rank_id_map = {}
+ self._is_cluster = False
+
+ @staticmethod
+ def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data,
+ headers, dimension, get_max=False):
+ if dimension not in headers:
+ logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers)
+ return None, None
+
+ dimension_index = headers.index(dimension)
+ diff_record = []
+ for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data):
+ target_data = target_row_data[dimension_index]
+ benchmark_data = benchmark_row_data[dimension_index]
+ if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)):
+ continue
+ diff_record.append(target_data - benchmark_data)
+
+ if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len(
+ diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD:
+ return None, None
+
+ value = max(diff_record) if get_max else min(diff_record)
+ value_index = diff_record.index(value)
+ step = target_cluster_statistic_data[value_index][headers.index("step")]
+ target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")]
+
+ return step, target_rank_id
+
+ @staticmethod
+ def _get_target_profiling_path_for_obs(profiling_path, rank_id):
+ # TODO
+ return None
+
+ def do_analysis(self):
+ result_list = []
+
+ profiling_path = self.kwargs.get("profiling_path")
+ benchmark_profiling_path = self.kwargs.get("benchmark_profiling_path")
+
+ if not self._check_profiling_path_valid(profiling_path):
+ logger.error("Got invalid argument '-d/--profiling_path' %s, skip analysis", profiling_path)
+ return
+ if benchmark_profiling_path and not self._check_profiling_path_valid(benchmark_profiling_path):
+ logger.error("Got invalid argument '-bp/--benchmark_profiling_path' %s, skip analysis",
+ benchmark_profiling_path)
+ return
+
+ self._is_cluster = self._is_cluster_profiling(profiling_path)
+ if not self._is_cluster:
+ job_list = self.single_rank_analysis(profiling_path, benchmark_profiling_path)
+ else:
+ job_list = self.cluster_analysis(profiling_path, benchmark_profiling_path)
+
+ for i, (dimension, scope, interface, kwargs) in enumerate(job_list[::-1]):
+ result_list.append(
+ interface.get_result(dimension, scope, render_html=i == len(job_list) - 1, output_dict=False, **kwargs)
+ )
+
+ for result in result_list[::-1]:
+ if result and hasattr(result, "show"):
+ result.show()
+ break
+
+ def single_rank_analysis(self, profiling_path, benchmark_profiling_path=None):
+ job_list = []
+
+ profiling_path = self._get_profiling_path_by_rank(profiling_path)
+ benchmark_profiling_path = self._get_profiling_path_by_rank(benchmark_profiling_path)
+
+ # 单卡场景无通信和集群分析
+ for dim in [Interface.COMMUNICATION, Interface.CLUSTER]:
+ if dim in self.dimensions:
+ self.dimensions.remove(dim)
+
+ for dimension in self.dimensions:
+ dimension_analysis_func_name = f"{dimension}_analysis"
+ if not hasattr(self, dimension_analysis_func_name):
+ continue
+ logger.info("Start %s analysis", dimension)
+ job_list += getattr(self, dimension_analysis_func_name)(profiling_path)
+
+ if benchmark_profiling_path:
+ # kernel/api 比对
+ job_list += self._single_profiling_comparison(profiling_path, benchmark_profiling_path)
+ else:
+ # 单卡性能拆解
+ self.overall(profiling_path)
+ return job_list
+
+ def cluster_analysis(self, profiling_path, benchmark_profiling_path=None):
+ job_list = []
+
+ # 单集群profiling分析:下发、通信、计算、显存/内存
+ for dimension in self.dimensions:
+ dimension_analysis_func_name = f"cluster_{dimension}_analysis"
+ if not hasattr(self, dimension_analysis_func_name):
+ continue
+ logger.info("Start cluster %s analysis", dimension)
+ job_list += getattr(self, dimension_analysis_func_name)(profiling_path)
+
+ if benchmark_profiling_path:
+ # 两个集群profiling比对分析
+ job_list += self._cluster_profiling_comparison(profiling_path, benchmark_profiling_path)
+ else:
+ # TODO 集群计算/下发/通信 cluster analysis topk
+ self.overall(profiling_path)
+ return job_list
+
+ def overall(self, profiling_path):
+ if self._is_cluster:
+ self.slow_rank_analyzer.optimize(template_key=Interface.OVERALL)
+ self.slow_link_analyzer.optimize(template_key=Interface.OVERALL)
+ else:
+ overall_analyzer = OverallSummaryAnalyzer(profiling_path)
+ overall_analyzer.optimize()
+
+ def schedule_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None):
+ # 任意单卡的下发分析
+
+ kwargs = copy.deepcopy(self.kwargs)
+ job_list = []
+
+ kwargs["profiling_path"] = profiling_path
+ kwargs["benchmark_profiling_path"] = benchmark_profiling_path
+ kwargs["step"] = step
+ kwargs["benchmark_step"] = benchmark_step
+
+ for dimension in [Interface.SCHEDULE, Interface.DATALOADER]:
+ for scope in Interface.get_scope(dimension):
+ interface = Interface(**kwargs)
+ job_list.append((dimension, scope, interface, kwargs))
+ return job_list
+
+ def computation_analysis(self, profiling_path, benchmark_profiling_path=None, step=None,
+ benchmark_step=None, stage=None):
+ # 任意单卡的计算分析
+
+ kwargs = copy.deepcopy(self.kwargs)
+ kwargs["profiling_path"] = profiling_path
+ kwargs["benchmark_profiling_path"] = benchmark_profiling_path
+ kwargs["step"] = step
+ kwargs["benchmark_step"] = benchmark_step
+ kwargs["stage"] = stage
+ job_list = []
+
+ for dimension in [Interface.COMPUTATION]:
+ for scope in Interface.get_scope(dimension):
+ if scope == SupportedScopes.STAGE_COMPUTE:
+ continue
+ interface = Interface(**kwargs)
+ job_list.append((dimension, scope, interface, kwargs))
+ return job_list
+
+ def communication_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None):
+ # 任意单卡的通信分析
+ kwargs = copy.deepcopy(self.kwargs)
+ job_list = []
+ # TODO,根据当前的讨论,这块主要是基于带宽的分析,需要能闭环通信重传、算子带宽异常检测 等问题
+ return job_list
+
+ def cluster_schedule_analysis(self, profiling_path):
+ # 目标集群profiling数据下发分析,不包含两个集群profiling数据的比对分析
+
+ job_list = []
+ global_step_rank = self.slow_rank_analyzer.get_global_step_rank(SlowRankAnalyzer.FREE)
+ slow_rank_id = global_step_rank.get("maximum", {}).get("rank_id") or self.default_rank_id
+ slow_step = global_step_rank.get("maximum", {}).get("step")
+ analysis_profiling_path = self._get_profiling_path_by_rank(profiling_path, slow_rank_id)
+
+ logger.info("Rank %s and step %s with max free", slow_rank_id, slow_step)
+
+ job_list += self.schedule_analysis(analysis_profiling_path, step=slow_step)
+ return job_list
+
+ def cluster_communication_analysis(self, profiling_path):
+ # 目标集群profiling数据通信分析,不包含两个集群profiling数据的比对分析
+
+ job_list = []
+
+ for bindwidth_type in [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA]:
+ global_step_rank = self.slow_link_analyzer.get_global_step_rank(bindwidth_type)
+ # 获取带宽最小的卡进行分析
+ target_rank_id = global_step_rank.get("minimum", {}).get("rank_id") or self.default_rank_id
+ step = global_step_rank.get("minimum", {}).get("step")
+ analysis_profiling_path = self._get_profiling_path_by_rank(profiling_path, target_rank_id)
+ logger.info("Rank %s and step %s with minimum %s bindwidth", target_rank_id, step, bindwidth_type)
+ job_list += self.communication_analysis(analysis_profiling_path, step=step)
+
+ return job_list
+
+ def cluster_computation_analysis(self, profiling_path):
+ # 目标集群profiling数据计算分析,不包含两个集群profiling数据的比对分析;如果有pp stage,则对不同stage进行计算分析
+
+ job_list = []
+ global_step_rank = self.slow_rank_analyzer.get_global_step_rank(SlowRankAnalyzer.COMPUTE)
+ _ = self.slow_rank_analyzer.get_stage_step_rank(SlowRankAnalyzer.COMPUTE)
+
+ # TODO, 对不同pp stage取min max进行分析
+
+ # 不区分stage,对所有卡取Min max进行分析
+ logger.info("Without pipeline parallel stage, Global analysis steps and ranks is %s",
+ json.dumps(global_step_rank))
+ slow_rank_id = global_step_rank.get("maximum", {}).get("rank_id") or self.default_rank_id
+ slow_step = global_step_rank.get("maximum", {}).get("step")
+ # 如果没有标杆profiling数据的rank id,说明没有快慢卡问题,直接对默认rank id进行分析,因此这里取值为None
+ fast_rank_id = global_step_rank.get("minimum", {}).get("rank_id")
+ fast_step = global_step_rank.get("minimum", {}).get("step")
+
+ logger.info("Max compute for rank %s with step %s, min compute for rank %s with step %s", slow_rank_id,
+ slow_step, fast_rank_id, fast_step)
+
+ job_list += self.computation_analysis(
+ self._get_profiling_path_by_rank(profiling_path, slow_rank_id),
+ self._get_profiling_path_by_rank(profiling_path, fast_rank_id),
+ slow_step,
+ fast_step
+ )
+
+ return job_list
+
+ def _single_profiling_comparison(self, profiling_path, benchmark_profiling_path, step=None,
+ benchmark_step=None):
+ # TODO 基于compare tools 对比计算下发
+ kwargs = copy.deepcopy(self.kwargs)
+ return []
+
+ def _cluster_profiling_comparison(self, profiling_path, benchmark_profiling_path):
+ # 从计算、下发和通信三个维度对集群profiling数据进行对比
+
+ job_list = []
+ benchmark_profiling_path = self._get_profiling_path_by_rank(benchmark_profiling_path)
+ benchmark_slow_rank_analyzer = SlowRankAnalyzer(benchmark_profiling_path)
+ benchmark_slow_link_analyzer = SlowLinkAnalyzer(benchmark_profiling_path)
+
+ # 计算和下发分析
+ job_list += self._cluster_data_comparison(profiling_path,
+ benchmark_profiling_path,
+ self.slow_rank_analyzer,
+ benchmark_slow_rank_analyzer,
+ get_max=True)
+
+ # 通信分析
+ job_list += self._cluster_data_comparison(profiling_path,
+ benchmark_profiling_path,
+ self.slow_link_analyzer,
+ benchmark_slow_link_analyzer,
+ get_max=False)
+ return job_list
+
+ def _cluster_data_comparison(self, profiling_path, benchmark_profiling_path, target_cluster_analyzer,
+ benchmark_cluster_analyzer, get_max=False):
+ # #low rank/slow link结果逐行对比获取差值最大的rank和step进行单卡分析
+ job_list = []
+
+ if isinstance(target_cluster_analyzer, SlowRankAnalyzer):
+ comparison_dims = [SlowRankAnalyzer.COMPUTE, SlowRankAnalyzer.FREE]
+ elif isinstance(target_cluster_analyzer, SlowLinkAnalyzer):
+ comparison_dims = [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA]
+ else:
+ return job_list
+
+ target_data = target_cluster_analyzer.format_datas.get("data", [])
+ benchmark_data = benchmark_cluster_analyzer.format_datas.get("data", [])
+ headers = benchmark_cluster_analyzer.format_datas.get("headers", [])
+
+ if len(target_data) != len(benchmark_data):
+ logger.warning(
+ "Number of rank ids of Benchmark profiling not equals to target profiling, skip cluster comparison.")
+ return job_list
+
+ for dimension in comparison_dims:
+ step_for_comparison, rank_id_for_comparison = AnalyzerController._get_step_rank_for_cluster_statistic_diff(
+ target_data,
+ benchmark_data,
+ headers,
+ dimension,
+ get_max=get_max
+ )
+ rank_profiling_path = self._get_profiling_path_by_rank(profiling_path, rank_id_for_comparison)
+ rank_benchmark_profiling_path = self._get_profiling_path_by_rank(
+ benchmark_profiling_path,
+ rank_id_for_comparison
+ )
+
+ job_list += self._single_profiling_comparison(
+ rank_profiling_path,
+ rank_benchmark_profiling_path,
+ step_for_comparison,
+ step_for_comparison
+ )
+ return job_list
+
+ def _is_cluster_profiling(self, profiling_path):
+ # 判断是否为集群场景,obs场景没办法调用PytorchDataPreprocessor判断是否是集群场景,只能根据ascend_pt后缀来判断
+
+ if not profiling_path.startswith(self.OBS_PREFIX):
+ # 数据在本地盘
+ path_list = [os.path.join(profiling_path, dir_name) for dir_name in os.listdir(profiling_path)]
+ ascend_pt_dirs = [path for path in path_list if os.path.isdir(path) and path.endswith("ascend_pt")]
+ data_processor = PytorchDataPreprocessor(ascend_pt_dirs)
+
+ self.cluster_local_data_map[profiling_path] = data_processor.get_data_map()
+
+ if not self.cluster_local_data_map or not self.cluster_local_data_map.get(profiling_path):
+ return False
+
+ self.default_rank_id = list(self.cluster_local_data_map[profiling_path].keys())[0]
+
+ self.slow_rank_analyzer = SlowRankAnalyzer(profiling_path)
+ self.slow_link_analyzer = SlowLinkAnalyzer(profiling_path)
+ return len(self.cluster_local_data_map[profiling_path]) >= self.CLUSTER_RANK_THRESHOLD
+
+ # TODO, 数据在obs上
+ return False
+
+ def _get_profiling_path_by_rank(self, profiling_path, rank_id=None):
+ # 根据传入的集群/单卡 profiling数据路径,以及rank id,获取对应rank的profiling数据路径,如果数据在obs则会自动下载至本地。
+
+ if profiling_path is None:
+ return profiling_path
+
+ if profiling_path.startswith(self.OBS_PREFIX):
+ # profiling数据在obs上时,自动下载对应rank的数据至本地,并返回本地路径
+ return AnalyzerController._get_target_profiling_path_for_obs(profiling_path, rank_id)
+
+ return self._get_target_profiling_path_for_local(profiling_path, rank_id)
+
+ def _get_target_profiling_path_for_local(self, profiling_path, rank_id):
+ rank_id_map = self.cluster_local_data_map.get(profiling_path, {})
+ if rank_id is None or not rank_id_map:
+ return profiling_path
+
+ return rank_id_map.get(rank_id)
+
+ def _check_profiling_path_valid(self, profiling_path):
+ # 检查profiling数据路径是否存在,支持obs路径和本地路径
+
+ if profiling_path.startswith(self.OBS_PREFIX):
+ # TODO
+ return False
+ elif not Path(profiling_path).exists():
+ logger.error("Profiling path is not existed. Invalid profiling path: %s", profiling_path)
+ return False
+ else:
+ return True
+
+
diff --git a/profiler/advisor/analyzer/base_analyzer.py b/profiler/advisor/analyzer/base_analyzer.py
index 80368e1d60a14020637ba60bb41c5536dcf2e081..8938756115c89df272c0d542e97034d76850c2c6 100644
--- a/profiler/advisor/analyzer/base_analyzer.py
+++ b/profiler/advisor/analyzer/base_analyzer.py
@@ -63,7 +63,7 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta):
return None
logger.info("Enable analysis %s with %s", self.__class__.__name__, ",".join(data_list))
- return func(self)
+ return func(self, **kwargs)
return wrapper
@@ -76,7 +76,7 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta):
def init_dataset_list(self)->None:
dataset_cls_list = self.dataset_cls_list
if len(dataset_cls_list) == 0:
- logger.warning(f"Analyser: %s don't rely on any dataset!", self.__class__.__name__)
+ logger.warning(f"Analyzer: %s don't rely on any dataset!", self.__class__.__name__)
return
for dataset_cls in dataset_cls_list:
diff --git a/profiler/advisor/analyzer/cluster/slow_link_analyser.py b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py
similarity index 57%
rename from profiler/advisor/analyzer/cluster/slow_link_analyser.py
rename to profiler/advisor/analyzer/cluster/slow_link_analyzer.py
index 0b585cbc7c5f136b15cd9eb035ea2dac5caa9e4e..bb2ba5f2a4092baafee7a034f48d7dcc516a0f0e 100644
--- a/profiler/advisor/analyzer/cluster/slow_link_analyser.py
+++ b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py
@@ -15,11 +15,16 @@
from collections import defaultdict
from typing import Dict, List
+import logging
+
from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer
from profiler.advisor.common import constant
from profiler.advisor.result.result import OptimizeResult
from profiler.advisor.result.item import OptimizeItem, OptimizeRecord
from profiler.advisor.dataset.cluster.cluster_dataset import ClusterCommunicationDataset
+from profiler.advisor.utils.utils import safe_index
+
+logger = logging.getLogger()
class SlowLinkAnalyzer(BaseAnalyzer):
@@ -34,7 +39,8 @@ class SlowLinkAnalyzer(BaseAnalyzer):
TRANSIT_SIZE = "Transit Size(MB)"
SDMA = "SDMA"
RDMA = "RDMA"
- SLOW_LINK_ANALYSIS = "slow_link_analysis"
+ SLOW_LINK_ANALYSIS = "slow link"
+ RATIO_THRESHOLD = 0.05
dataset_cls_list = [ClusterCommunicationDataset]
def __init__(self, collection_path, n_processes: int = 1, **kwargs):
@@ -45,18 +51,24 @@ class SlowLinkAnalyzer(BaseAnalyzer):
self.result = OptimizeResult()
self.bottelneck = ''
self.suggestion = ''
- self.format_datas = []
+ self.format_datas = self.format_details()
+
+ @staticmethod
+ def compute_max_gap_ratio(data: list, mean: float):
+ if mean == 0:
+ return 0
+ else:
+ return (max(data) - min(data)) / mean
def optimize(self, **kwargs):
if self.rank_bw_dict is None:
- print("Slow link analysis failed due to data loading failure. \
+ logger.error("Slow link analysis failed due to data loading failure. \
Please check your cluster_analysis_output folder. \
If you are not concerned about this type of data, please ignore this message.")
return self.result
self.process()
- self.format_datas = self.format_details()
self.make_record()
- self.make_render()
+ self.make_render(kwargs.get("template_key"))
return self.result
def process(self):
@@ -69,7 +81,7 @@ class SlowLinkAnalyzer(BaseAnalyzer):
if len(data_list) > 0:
avg_bw = round(sum(data_list) / len(data_list), 3)
else:
- print("The slow link (identified bottleneck) cannot provide a bottleneck \
+ logger.info("The slow link (identified bottleneck) cannot provide a bottleneck \
because the analysis data is missing bandwidth information.")
return
self.bottelneck += f'{link_type}: \n' \
@@ -88,14 +100,19 @@ class SlowLinkAnalyzer(BaseAnalyzer):
details_dict = {}
headers = list({k for rank_bw_value in self.rank_bw_dict.values() for k in rank_bw_value.keys()})
headers.sort()
- data_list = [[rank_id] + [rank_bw.get(k, 0) for k in headers] for rank_id, rank_bw in self.rank_bw_dict.items()]
- data_list.sort(key = lambda x: x[0]) # 按rank_id排序
-
- details_dict["headers"] = ["rank_id"] + headers
+
+ data_list = []
+ for step_rank, rank_bw in self.rank_bw_dict.items():
+ step_rank_list = list(map(int, step_rank.split(constant.STEP_RANK_SEP)))
+ value_list = [rank_bw.get(i, 0) for i in headers]
+ data_list.append(step_rank_list + value_list)
+ data_list.sort(key=lambda x: (x[0], x[1])) # 按rank_id排序
+
+ details_dict["headers"] = ["step", "rank_id"] + headers
details_dict["data"] = data_list
return details_dict
-
+
def make_record(self):
"""
make record for what and how to optimize
@@ -107,20 +124,65 @@ class SlowLinkAnalyzer(BaseAnalyzer):
)
self.result.add(OptimizeRecord(optimization_item))
- for i, data in enumerate(self.format_datas["data"]):
- self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas["headers"], data)
+ for data in self.format_datas.get("data", []):
+ self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas.get("headers", []), data)
- def make_render(self):
+ def make_render(self, template_key="cluster"):
result_for_html = {
- "Description" : self.bottelneck,
- "suggestion" : self.suggestion,
- "details" : [self.format_datas]
+ "Description": self.bottelneck,
+ "suggestion": self.suggestion,
+ "details": [self.format_datas]
}
- self.html_render.render_template(key="cluster",
+ self.html_render.render_template(key=template_key,
title=SlowLinkAnalyzer.SLOW_LINK_ANALYSIS,
template_dir="templates",
template_name="cluster_analysis.html",
cann_version=self.cann_version,
torch_version=self.torch_version,
- result=result_for_html)
\ No newline at end of file
+ result=result_for_html)
+
+ def get_global_step_rank(self, bindwidth_type):
+ global_step_rank = {}
+ bindwidth_key_map = {self.RDMA: self.RDMA_BANDWIDTH, self.SDMA: self.SDMA_BANDWIDTH}
+
+ if bindwidth_type not in bindwidth_key_map:
+ raise RuntimeError(f"Error bindwidth type {bindwidth_type}, optionals are {bindwidth_key_map.keys()}")
+
+ headers = self.format_datas.get("headers")
+
+ bindwidth_index = safe_index(headers, bindwidth_key_map.get(bindwidth_type))
+
+ if bindwidth_index is not None:
+ data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data", [])]
+ max_bandwidth, min_bandwidth = max(data_list), min(data_list)
+
+ if self.compute_max_gap_ratio(data_list, sum(data_list) / len(
+ data_list)) < self.RATIO_THRESHOLD:
+ return global_step_rank
+
+ max_bandwidth_index = data_list.index(max_bandwidth)
+ min_bandwidth_index = data_list.index(min_bandwidth)
+
+ rank_id_index = safe_index(headers, "rank_id")
+ step_index = safe_index(headers, "step")
+
+ if rank_id_index is None:
+ return global_step_rank
+
+ max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][rank_id_index]
+ min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][rank_id_index]
+
+ if step_index is None:
+ max_bandwidth_step, min_bandwidth_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP
+ else:
+ max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][step_index]
+ min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][step_index]
+
+ global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step}
+ global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step}
+
+ return global_step_rank
+
+ def get_priority(self):
+ pass
\ No newline at end of file
diff --git a/profiler/advisor/analyzer/cluster/slow_rank_analyser.py b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py
similarity index 43%
rename from profiler/advisor/analyzer/cluster/slow_rank_analyser.py
rename to profiler/advisor/analyzer/cluster/slow_rank_analyzer.py
index f439b31f7736ee4777d5ef10bf968738a76ae1b3..fb81883fee4685fa8c76281ace4a9b3edb204312 100644
--- a/profiler/advisor/analyzer/cluster/slow_rank_analyser.py
+++ b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py
@@ -13,43 +13,61 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import defaultdict
-from typing import Dict, List
+import logging
+
from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer
from profiler.advisor.common import constant
from profiler.advisor.result.result import OptimizeResult
from profiler.advisor.result.item import OptimizeItem, OptimizeRecord
from profiler.advisor.dataset.cluster.cluster_dataset import ClusterStepTraceTimeDataset
+from profiler.advisor.utils.utils import safe_index
+
+logger = logging.getLogger()
class SlowRankAnalyzer(BaseAnalyzer):
- SLOW_RANK_ANALYSIS = "slow_rank_analysis"
+ SLOW_RANK_ANALYSIS = "slow rank"
RANK = "rank"
RATIO_THRESHOLD = 0.05
BOTTLENECK_LIST = ['Computing', 'Communication', "Free"]
dataset_cls_list = [ClusterStepTraceTimeDataset]
+ COMPUTE = "compute(us)"
+ FREE = "free(us)"
+ COMMUNICATION = "communication(us)"
def __init__(self, collection_path, n_processes: int = 1, **kwargs):
super().__init__(collection_path, n_processes, **kwargs)
key = ClusterStepTraceTimeDataset.get_key()
- self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key)
+ self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key)
self.step_trace_dict = self.step_trace_class.get_data()
+ self.stages = self.step_trace_class.get_stages()
self.result = OptimizeResult()
self.bottelneck = ''
self.suggestion = ''
- self.format_datas = []
+ self._steps = set()
+ self.format_datas = self.format_details()
+
+ @property
+ def steps(self):
+ return sorted(list(self._steps))
+
+ @staticmethod
+ def compute_max_gap_ratio(data: list, mean: float):
+ if mean == 0:
+ return 0
+ else:
+ return (max(data) - min(data)) / mean
def optimize(self, **kwargs):
if self.step_trace_dict is None:
- print("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \
+ logger.error("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \
如不关心这类数据请忽略")
return self.result
self.process()
- self.format_datas = self.format_details()
self.make_record()
- self.make_render()
+ self.make_render(kwargs.get("template_key"))
return self.result
-
+
def process(self):
total_time_list = [sum(data_tuple) for rank_id, data_tuple in self.step_trace_dict.items()]
if total_time_list:
@@ -57,6 +75,9 @@ class SlowRankAnalyzer(BaseAnalyzer):
for i in range(len(self.BOTTLENECK_LIST)):
self.produce_bottleneck(self.step_trace_dict, i, mean_total_time)
+ if not self.bottelneck:
+ self.bottelneck = "There is no slow rank issues"
+
def produce_bottleneck(self, step_dict: dict, produce_type: int, mean_total_time: float):
data_list = [data_tuple[produce_type] for rank_id, data_tuple in step_dict.items()]
max_ratio = self.compute_max_gap_ratio(data_list, mean_total_time)
@@ -70,33 +91,41 @@ class SlowRankAnalyzer(BaseAnalyzer):
"""
make record for what and how to optimize
"""
+
optimization_item = OptimizeItem(
SlowRankAnalyzer.SLOW_RANK_ANALYSIS,
self.bottelneck,
self.suggestion
)
self.result.add(OptimizeRecord(optimization_item))
- for i, data in enumerate(self.format_datas["data"]):
- self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.format_datas["headers"], data)
+
+ data_list = self.format_datas.get("data", [])
+ headers = self.format_datas.get("headers", [])
+ for data in data_list:
+ self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, headers, data)
def format_details(self):
details_dict = {}
- headers = ["rank_id", "compute(us)", "communication(us)", "free(us)"]
+ headers = ["step", "rank_id", "compute(us)", "communication(us)", "free(us)"]
data_list = []
- for key,value in self.step_trace_dict.items():
- data_list.append([key] + value)
+ for key, value in self.step_trace_dict.items():
+ step, rank_id = key.split(constant.STEP_RANK_SEP)
+ data_list.append([int(step), int(rank_id)] + value)
+ if step and step not in self._steps:
+ self._steps.add(step)
+
details_dict["headers"] = headers
- details_dict["data"] = data_list
+ details_dict["data"] = sorted(data_list, key=lambda x: (x[0], x[1]))
return details_dict
- def make_render(self):
+ def make_render(self, template_key="cluster"):
result_for_html = {
- "Description" : self.bottelneck,
- "suggestion" : self.suggestion,
- "details" : [self.format_datas]
+ "Description": self.bottelneck,
+ "suggestion": self.suggestion,
+ "details": [self.format_datas]
}
- self.html_render.render_template(key="cluster",
+ self.html_render.render_template(key=template_key,
title=SlowRankAnalyzer.SLOW_RANK_ANALYSIS,
template_dir="templates",
template_name="cluster_analysis.html",
@@ -104,9 +133,83 @@ class SlowRankAnalyzer(BaseAnalyzer):
torch_version=self.torch_version,
result=result_for_html)
- @staticmethod
- def compute_max_gap_ratio(data: list, mean: float):
- if mean == 0:
- return 0
+ def get_global_step_rank(self, dimension):
+ global_step_rank = {}
+
+ headers = self.format_datas.get("headers")
+
+ dimension_index = safe_index(headers, dimension)
+ rank_id_index = safe_index(headers, "rank_id")
+ step_index = safe_index(headers, "step")
+ if dimension_index is None or rank_id_index is None:
+ return global_step_rank
+
+ data_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")]
+ max_time, min_time = max(data_list), min(data_list)
+
+ if self.compute_max_gap_ratio(data_list, sum(data_list) / len(
+ data_list)) < self.RATIO_THRESHOLD:
+ return global_step_rank
+ max_time_index = data_list.index(max_time)
+ min_time_index = data_list.index(min_time)
+
+ max_time_rank_id = self.format_datas.get("data")[max_time_index][rank_id_index]
+ min_time_rank_id = self.format_datas.get("data")[min_time_index][rank_id_index]
+
+ if step_index is not None:
+ max_time_step = self.format_datas.get("data")[max_time_index][step_index]
+ min_time_step = self.format_datas.get("data")[min_time_index][step_index]
else:
- return (max(data) - min(data)) / mean
+ max_time_step, min_time_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP
+
+ global_step_rank["maximum"] = {"rank_id": max_time_rank_id, "step": max_time_step}
+ global_step_rank["minimum"] = {"rank_id": min_time_rank_id, "step": min_time_step}
+
+ return global_step_rank
+
+ def get_stage_step_rank(self, dimension):
+ stage_step_rank = {}
+
+ headers = self.format_datas.get("headers")
+ dimension_index = safe_index(headers, dimension)
+ rank_id_index = safe_index(headers, "rank_id")
+ step_index = safe_index(headers, "step")
+ if dimension_index is None or rank_id_index is None:
+ return stage_step_rank
+
+ rank_list = [tuple_list[rank_id_index] for tuple_list in self.format_datas.get("data")]
+ cost_time_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")]
+
+ if step_index is not None:
+ step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")]
+ else:
+ step_list = [constant.DEFAULT_STEP] * len(rank_list)
+
+ for index, stage in enumerate(self.stages):
+ tmp_step_list, tmp_rank_list, tmp_time_list = [], [], []
+ for step, rank_id, time in zip(step_list, rank_list, cost_time_list):
+ if rank_id not in stage:
+ continue
+
+ tmp_step_list.append(step)
+ tmp_rank_list.append(rank_id)
+ tmp_time_list.append(time)
+
+ if self.compute_max_gap_ratio(tmp_time_list, sum(tmp_time_list) / len(
+ tmp_time_list)) < self.RATIO_THRESHOLD:
+ continue
+
+ max_time, min_time = max(tmp_time_list), min(tmp_time_list)
+ max_time_index, min_time_index = tmp_time_list.index(max_time), tmp_time_list.index(min_time)
+
+ stage_key = f"stage-{index}"
+ stage_step_rank[stage_key] = {}
+ stage_step_rank[stage_key]["maximum"] = {"rank_id": tmp_rank_list[max_time_index],
+ "step": tmp_step_list[max_time_index]}
+ stage_step_rank[stage_key]["minimum"] = {"rank_id": tmp_rank_list[min_time_index],
+ "step": tmp_step_list[min_time_index]}
+
+ return stage_step_rank
+
+ def get_priority(self):
+ pass
\ No newline at end of file
diff --git a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py
index 4f25deff7c0cdb415ccae6ab748304d4044c5eec..52e5594bf0f19154270a1e0aa073b534c703e2cd 100644
--- a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py
+++ b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py
@@ -26,11 +26,10 @@ class AICoreFreqAnalyzer(BaseAnalyzer):
if not Config().get_config("aic_frequency"):
logger.warning("Can not find ai core frequency in info.json*, please check data integrity.")
return self.result
+
add_render_list = kwargs.get("add_render_list", True)
ai_core_freq_checker = AICoreFreqChecker()
- ai_core_freq_checker.check_ai_core_freq(self.dataset)
- if not ai_core_freq_checker.ai_core_freq_issues:
- return self.result
+ ai_core_freq_checker.check_ai_core_freq(self.dataset, rank_id=kwargs.get("rank_id"), stage=kwargs.get("stage"))
ai_core_freq_checker.make_record(self.result)
self.html = ai_core_freq_checker.make_render(self.html_render, add_render_list)
return self.result
diff --git a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py
index 7afa09cca48fd9939c4fcbfdf2a9fb5f29e3b468..28155207e3a4c6ccce70687ee2561c6ecbb80876 100644
--- a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py
+++ b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py
@@ -61,6 +61,8 @@ class AICoreFreqChecker:
self.decrease_freq_ops.sort(key=
lambda x: (x[self.TOTAL_DURATION_INDEX], x[self.DECREASE_FREQ_RATIO_INDEX]),
reverse=True)
+ if not self.ai_core_freq_issues:
+ return
self.desc = (f"{len(self.decrease_freq_ops)} operators are found during frequency reduction, and the reduction "
f"ratio is larger than {self.DECREASE_FREQ_RATIO}.")
@@ -72,22 +74,27 @@ class AICoreFreqChecker:
"""
make record for what and how to optimize
"""
- optimization_item = OptimizeItem("AI Core Frequency", self.desc, [self.suggestions])
+ if not self.ai_core_freq_issues:
+ return
+
+ sheet_name = "AI Core Frequency"
+ if self.rank_id is not None:
+ sheet_name = f"rank {self.rank_id} AI Core Frequency".capitalize()
+
+ optimization_item = OptimizeItem(sheet_name, self.desc, [self.suggestions])
result.add(OptimizeRecord(optimization_item))
self.headers = ["Operator name", "Count", "Total duration(us)", "AI CORE frequency decreased ratio",
"Average frequency", "Max frequency", "Min frequency"]
- if self.rank_id:
- self.headers = ["Rank id"] + self.headers
- sub_table_name = "AI Core Frequency" if not self.stage else f"Stage-{self.stage}: AI Core Frequency"
- result.add_detail(sub_table_name, headers=self.headers)
+ result.add_detail(sheet_name, headers=self.headers)
for row in self.decrease_freq_ops:
- if self.rank_id:
- row = [self.rank_id] + row
- result.add_detail(sub_table_name, detail=row)
+ result.add_detail(sheet_name, detail=row)
def make_render(self, html_render, add_render_list=True):
+ if not self.ai_core_freq_issues:
+ return None
+
if self.SHOW_TOPK_OPS:
self.desc += f" Only show {self.SHOW_TOPK_OPS} operators here, see latest mstt_advisor.xlsx for details."
return html_render.render_template(key="computation",
diff --git a/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py
index 0caede4b894e0dda15333e6d3a480fa943c66323..05637cb06f098c45ba02a94c26061e62da4d05a1 100644
--- a/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py
+++ b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py
@@ -145,11 +145,14 @@ class AicpuChecker(OperatorChecker):
",".join(double_type_ai_cpu_operator)))
return True
- def make_render(self, html_render, record):
- html_render.render_template(key="computation",
- template_dir="templates",
- template_name="operator_ai_cpu.html",
- format_result=self.format_operator_result(record, constant.OPERATOR_LIST_UNLIMIT))
+ def make_render(self, html_render, record, add_render_list=True):
+
+ return html_render.render_template(key="computation",
+ template_dir="templates",
+ template_name="operator_ai_cpu.html",
+ format_result=self.format_operator_result(record,
+ constant.OPERATOR_LIST_UNLIMIT),
+ add_render_list=add_render_list)
def format_operator_result(self, record, limit):
"""
diff --git a/profiler/advisor/analyzer/computation/bound/block_dim_checker.py b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py
index 7a873c65635fcc8f2ebb35c8d317de09d78da491..f04c668ab2f5b0aea89681641021bcdda7bf7e91 100644
--- a/profiler/advisor/analyzer/computation/bound/block_dim_checker.py
+++ b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py
@@ -45,11 +45,13 @@ class BlockDimChecker(OperatorChecker):
"task duration are as follows:\n"
return True
- def make_render(self, html_render, record):
- html_render.render_template(key="computation",
- template_dir="templates",
- template_name="operator_block_dim.html",
- format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK))
+ def make_render(self, html_render, record, add_render_list=True):
+ return html_render.render_template(key="computation",
+ template_dir="templates",
+ template_name="operator_block_dim.html",
+ format_result=self.format_operator_result(record,
+ constant.OPERATOR_OUT_TOPK),
+ add_render_list=add_render_list)
def _check_operator(self, op_info) -> bool:
if op_info.task_type not in ["AI_CORE", "AI_VECTOR_CORE", "MIX_AIC"]:
diff --git a/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py
index a22b380f974b14207d6d7be262cd49f0ba0fbe99..9e4179425dfc57260541ea0301dae7174b2c7951 100644
--- a/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py
+++ b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py
@@ -46,8 +46,10 @@ class OperatorBoundChecker(OperatorChecker):
return False
return True
- def make_render(self, html_render, record):
- html_render.render_template(key="computation",
- template_dir="templates",
- template_name="operator_no_bound.html",
- format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK))
+ def make_render(self, html_render, record, add_render_list=True):
+ return html_render.render_template(key="computation",
+ template_dir="templates",
+ template_name="operator_no_bound.html",
+ format_result=self.format_operator_result(record,
+ constant.OPERATOR_OUT_TOPK),
+ add_render_list=add_render_list)
diff --git a/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py
index 86d3bac4ff8cb163d23a6365307b855839b12a6a..916534434f143dbacf1488cd88b20b58c26eb5b2 100644
--- a/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py
+++ b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py
@@ -27,11 +27,13 @@ class DynamicShapeChecker(OperatorChecker):
def check(self, profiling_database) -> bool:
return self.is_dynamic_shape(profiling_database)
- def make_record(self, profiling_database) -> OptimizeRecord:
+ def make_record(self, profiling_database, rank_id=None) -> OptimizeRecord:
"""
make record for what and how to optimize
"""
+ if rank_id is not None:
+ self._PROBLEM = f"rank {rank_id} ".capitalize() + self._PROBLEM.lower()
optimization_item = OptimizeItem(
self._PROBLEM,
self._description,
@@ -58,8 +60,9 @@ class DynamicShapeChecker(OperatorChecker):
format_result = {"record": record.__dict__, "suggestion": '
'.join(release_suggestion_list)}
return format_result
- def make_render(self, html_render, record):
- html_render.render_template(key="computation",
- template_dir="templates",
- template_name="operator_dynamic_shape.html",
- format_result=self.format_operator_result(record))
+ def make_render(self, html_render, record, add_render_list=True):
+ return html_render.render_template(key="computation",
+ template_dir="templates",
+ template_name="operator_dynamic_shape.html",
+ format_result=self.format_operator_result(record),
+ add_render_list=add_render_list)
diff --git a/profiler/advisor/analyzer/computation/operator_checker.py b/profiler/advisor/analyzer/computation/operator_checker.py
index 64618b56a8df7f380277e99ae7ca47cd69d24648..908659643baab3754904530a117f15a23ded50b6 100644
--- a/profiler/advisor/analyzer/computation/operator_checker.py
+++ b/profiler/advisor/analyzer/computation/operator_checker.py
@@ -40,6 +40,23 @@ class OperatorChecker(VersionControl):
self.cann_version = cann_version
self._op_list: List[OpInfo] = []
+ @classmethod
+ def get_name(cls):
+ """
+ get name of checker
+ :return: checker name
+ """
+ return cls._PROBLEM
+
+ @staticmethod
+ def get_ratio(op_info: OpInfo, attr: str) -> float:
+ if not op_info.has_attr(attr):
+ return 0
+ value = op_info.get_attr(attr)
+ if not value or value == "N/A":
+ return 0
+ return float(value)
+
def check(self, profiling_data: ProfilingDataset) -> bool:
"""
check if any operator need optimize
@@ -77,12 +94,16 @@ class OperatorChecker(VersionControl):
return True
return False
- def make_record(self, profiling_data: ProfilingDataset):
+ def make_record(self, profiling_data: ProfilingDataset, rank_id=None):
"""
Make record for what and how to optimize
:param profiling_data: profiling data
:return: optimize record
"""
+
+ if rank_id is not None:
+ self._PROBLEM = f"rank {rank_id} ".capitalize() + self._PROBLEM.lower()
+
task_duration_list = [float(op_info.get_attr("task_duration")) for op_info in self._op_list if
hasattr(op_info, "get_attr")]
total_cost_time = sum(task_duration_list)
@@ -239,14 +260,6 @@ class OperatorChecker(VersionControl):
"""Get node views."""
return []
- @classmethod
- def get_name(cls):
- """
- get name of checker
- :return: checker name
- """
- return cls._PROBLEM
-
def get_incomes(self) -> float:
"""get incomes"""
incomes = 0.0
@@ -270,15 +283,6 @@ class OperatorChecker(VersionControl):
return False
return True
- @staticmethod
- def get_ratio(op_info: OpInfo, attr: str) -> float:
- if not op_info.has_attr(attr):
- return 0
- value = op_info.get_attr(attr)
- if not value or value == "N/A":
- return 0
- return float(value)
-
def get_details(self) -> list:
"""
get details of operator to be optimized
diff --git a/profiler/advisor/analyzer/computation/profiling_analyzer.py b/profiler/advisor/analyzer/computation/profiling_analyzer.py
index 2021bcd5765d1df7489f202b3453a83924fb28dc..d9d59ca0092fd451bff2a36680ab029eec573213 100644
--- a/profiler/advisor/analyzer/computation/profiling_analyzer.py
+++ b/profiler/advisor/analyzer/computation/profiling_analyzer.py
@@ -22,6 +22,7 @@ class ProfilingAnalyzer(BaseAnalyzer, ABC):
self.checker = OperatorChecker(self.cann_version)
self.html_render = HTMLRender()
self.result = OptimizeResult()
+ self.html = None
@BaseAnalyzer.check_data((ProfilingDataset.get_key(),))
def optimize(self, **kwargs) -> OptimizeResult:
@@ -32,22 +33,28 @@ class ProfilingAnalyzer(BaseAnalyzer, ABC):
"""
profiling_data = self.get_first_data_by_key(self.dataset_list, ProfilingDataset.get_key())
checker = self.checker
+ rank_id = kwargs.get("rank_id")
+
+ add_render_list = kwargs.get("add_render_list", True)
+
if not checker.pre_check(profiling_data):
return self.result
if checker.check(profiling_data):
# add record
- record = checker.make_record(profiling_data)
- checker.make_render(self.html_render, record)
+ record = checker.make_record(profiling_data, rank_id)
+ self.html = checker.make_render(self.html_render, record, add_render_list)
self.result.add(record)
# add details
details = checker.get_details()
if details:
for i, detail in enumerate(details):
+ sheet_name = checker.get_name() if rank_id is None else \
+ f"rank {rank_id} ".capitalize() + checker.get_name()
if i == 0:
# the first row is header
- self.result.add_detail(checker.get_name(), headers=detail)
+ self.result.add_detail(sheet_name, headers=detail)
else:
- self.result.add_detail(checker.get_name(), detail=detail)
+ self.result.add_detail(sheet_name, detail=detail)
# add tune op list
tune_op_list = checker.get_tune_op_list()
if tune_op_list:
diff --git a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py
index 326be83b8d49088b1563ccd8c08b68a4aa3001ef..3cbf5f76e1d0e86feab170e1547d47c523cf0280 100644
--- a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py
+++ b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py
@@ -20,17 +20,19 @@ class FusionOPAnalyzer(BaseAnalyzer):
super(FusionOPAnalyzer, self).__init__(collection_path, **kwargs)
self.result = OptimizeResult()
self.html_render = HTMLRender()
-
+ self.html = None
+
@BaseAnalyzer.check_data((GraphDataset.get_key(),))
def optimize(self, **kwargs):
"""
:return: result
"""
- self._check(self.dataset_list.get("GraphDataset"), self.dataset_list.get("ProfilingDataset"))
+ self._check(self.dataset_list.get("GraphDataset"), self.dataset_list.get("ProfilingDataset"),
+ kwargs.get("add_render_list"))
return self.result
- def _check(self, graph_data: List[GraphDataset],
- profiling_data: List[ProfilingDataset] = None) -> None:
+ def _check(self, graph_data: List[GraphDataset], profiling_data: List[ProfilingDataset] = None,
+ add_render_list=True) -> None:
if len(graph_data) == 0 or graph_data[0].is_empty():
return
for _, rule in self.RULES.items():
@@ -40,10 +42,4 @@ class FusionOPAnalyzer(BaseAnalyzer):
else:
checker.find_fusion_matched_issues_with_times(graph_data, profiling_data)
checker.make_record(self.result)
- checker.make_render(self.html_render)
-
- def make_record(self):
- pass
-
- def make_render(self):
- pass
+ self.html = checker.make_render(self.html_render, add_render_list)
\ No newline at end of file
diff --git a/profiler/advisor/analyzer/overall/overall_summary_analyzer.py b/profiler/advisor/analyzer/overall/overall_summary_analyzer.py
index 8e93dbda77d4915e716af856114184324d1d8807..8c328fe07e5a4acec4cc0d931b2f234ca6470835 100644
--- a/profiler/advisor/analyzer/overall/overall_summary_analyzer.py
+++ b/profiler/advisor/analyzer/overall/overall_summary_analyzer.py
@@ -23,7 +23,7 @@ from profiler.compare_tools.compare_interface.comparison_interface import Compar
class OverallSummaryAnalyzer(BaseAnalyzer):
- OVERALL_SUMMARY_ANALYZER = "overall_summary_analysis"
+ OVERALL_SUMMARY_ANALYZER = "overall summary"
advice_map = {
"Computing Time": "if you want more detailed advice please go to mstt_advisor_*.html",
"Uncovered Communication Time": "if you want more detailed advice please go to mstt_advisor_*.html",
diff --git a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py
index c1eb24b8e1e11ac167a7eb9333867167a57dd524..1a6dd194f80d776ccfee405dfb3a790e1d9e8d51 100644
--- a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py
+++ b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py
@@ -154,8 +154,9 @@ class TimelineFusionOpsAnalyzer(BaseAnalyzer):
timeline_profiling_doc_url=const.TIMELINE_WITH_STACK_DOC_URL
)
+ sheet_name = "Affinity apis"
optimization_item = OptimizeItem(
- SupportedScopes.TIMELINE_FUSION_OPS,
+ sheet_name,
desc,
[suggestion]
)
@@ -163,16 +164,16 @@ class TimelineFusionOpsAnalyzer(BaseAnalyzer):
self.result.add(OptimizeRecord(optimization_item))
record_title = ["Affinity API", "Code stacks", "Stack called counts"]
- self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, headers=record_title)
+ self.result.add_detail(sheet_name, headers=record_title)
for api_name, stacks_info in format_timeline_result(self.matched_op_stacks).items():
if not stacks_info:
detail = [api_name, "null", "null"]
- self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, detail=detail)
+ self.result.add_detail(sheet_name, detail=detail)
else:
for stack in stacks_info:
detail = [api_name, *stack]
- self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, detail=detail)
+ self.result.add_detail(sheet_name, detail=detail)
def make_render(self):
format_result_for_html = format_timeline_result(dict(self.matched_op_stacks), dump_html=True)
diff --git a/profiler/advisor/common/constant.py b/profiler/advisor/common/constant.py
index c97cfbfd11e27a3d83ea2f9a25ea7870899bcfd1..53b5a64e26d9eb148d73ae1dd39f7c767975b524 100644
--- a/profiler/advisor/common/constant.py
+++ b/profiler/advisor/common/constant.py
@@ -33,6 +33,7 @@ TASK_TYPE = "Task Type"
CPU_OP = "cpu_op"
AI_CORE = "AI_CORE"
AI_CPU = "AI_CPU"
+MIX_AIC = "MIX_AIC"
CALL_STACKS = "Call stack"
INPUT_DIMS = "Input Dims"
OP_SEP = "-"
@@ -48,8 +49,7 @@ NO_STACK_REASON_MAP = {
TIMELINE_BACKWARD_NO_STACK_CODE: "Backward broadcast, without call stacks in profiling.",
TIMELINE_ACL_TO_NPU_NO_STACK_CODE: "Incoming flow is 'acl_to_npu', without call stacks in profiling."
}
-TIMELINE_API_DOC_URL = "https://gitee.com/ascend/mstt/blob/master/profiler/advisor/doc/"\
- "Samples%20of%20Fused%20Operator%20API%20Replacement.md"
+TIMELINE_API_DOC_URL = "https://gitee.com/ascend/mstt/blob/master/profiler/advisor/doc/Samples%20of%20Fused%20Operator%20API%20Replacement.md"
AFFINITY_TRAINING_API = "Affinity training api"
TIMELINE_WITH_STACK_DOC_URL = "https://www.hiascend.com/document/detail/zh/canncommercial/" \
"70RC1/modeldevpt/ptmigr/AImpug_0067.html"
@@ -156,7 +156,17 @@ COMMUNICATION_JSON = "communication.json"
BOTTLENECK = "bottleneck"
DATA = "data"
-
+ADVISOR_ANALYSIS_OUTPUT_DIR = "advisor_analysis_result"
+DEFAULT_PROCESSES = 8
+CLUSTER_ANALYSIS_FILE_PATTERN = [r'profiler_info_\d+\.json', "step_trace_time.csv", "communication.json",
+ "communication_matrix.json"]
+ANALYSIS_OUTPUT_PATH = "ANALYSIS_OUTPUT_PATH"
+DEFAULT_RANK_FOR_PROFILING_ANALYSIS = 0
+PROFILER_INFO_FILE_PATTERN = r"profiler_info_(\d+)\.json"
+DISABLE_STREAMINIG_READER = "DISABLE_STREAMINIG_READER"
FRAMEWORK_STACK_BLACK_LIST = ["torch", "torch_npu", "megatron", "deepspeed"]
DISABLE_STREAMING_READER = "DISABLE_STREAMING_READER"
MAX_FILE_SIZE = 10**10
+MAX_NUM_PROCESSES = 4
+DEFAULT_STEP = "-1"
+STEP_RANK_SEP = "_"
\ No newline at end of file
diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py
index b4956139c58436f6998ea8ce94a56fc280c038c3..cd750796241f129b41d07552a60a7baa1f882a57 100644
--- a/profiler/advisor/dataset/cluster/cluster_dataset.py
+++ b/profiler/advisor/dataset/cluster/cluster_dataset.py
@@ -15,6 +15,7 @@
import logging
import os
+import re
from profiler.advisor.dataset.dataset import Dataset
from profiler.advisor.utils.utils import singleton
@@ -79,9 +80,11 @@ class ClusterDataset(Dataset):
@singleton
class ClusterStepTraceTimeDataset(ClusterDataset):
RANK = "rank"
+ STAGE = "stage"
def __init__(self, collection_path: str, data: dict, **kwargs):
self._step_dict = defaultdict()
+ self._stages = []
super().__init__(collection_path, data)
def _parse(self):
@@ -99,14 +102,31 @@ class ClusterStepTraceTimeDataset(ClusterDataset):
step_dict = defaultdict(lambda: [0, 0, 0])
for step_bean in step_data:
if step_bean.type == self.RANK:
- step_dict[step_bean.index][0] += step_bean.compute
- step_dict[step_bean.index][1] += step_bean.communication
- step_dict[step_bean.index][2] += step_bean.free
+ step_rank_record = []
+ step = str(step_bean.step).replace(" ", "") or str(const.DEFAULT_STEP)
+ rank = str(step_bean.index).replace(" ", "")
+ if step:
+ step_rank_record.append(step)
+ if rank:
+ step_rank_record.append(rank)
+
+ step_rank_index = const.STEP_RANK_SEP.join(step_rank_record)
+ step_dict[step_rank_index][0] += step_bean.compute
+ step_dict[step_rank_index][1] += step_bean.communication
+ step_dict[step_rank_index][2] += step_bean.free
+ if step_bean.type == self.STAGE:
+ stage = sorted(list(map(int, re.findall(r'\d+', step_bean.stage))))
+ if stage in self._stages:
+ continue
+ self._stages.append(stage)
return step_dict
def get_data(self):
return self._step_dict
+ def get_stages(self):
+ return sorted(self._stages)
+
@singleton
class ClusterCommunicationDataset(ClusterDataset):
@@ -156,7 +176,7 @@ class ClusterCommunicationDataset(ClusterDataset):
self.hccl_dict.setdefault(comm_group, defaultdict(lambda: defaultdict(list)))
for step, step_dict in group_dict.items():
for op, op_dict in step_dict.items():
- self.compute_bandwidth(op_dict)
+ self.compute_bandwidth(step.lower().lstrip("step") or str(const.DEFAULT_STEP), op_dict)
self.process_hccl_info(comm_group, step, op, op_dict)
def process_hccl_info(self, group, step, op, op_dict):
@@ -173,7 +193,7 @@ class ClusterCommunicationDataset(ClusterDataset):
msg = "[ERROR] Cluster_communication.json has invalid structure."
raise ValueError(msg) from e
- def compute_bandwidth(self, op_dict: dict):
+ def compute_bandwidth(self, step, op_dict: dict):
for rank_id, rank_dict in op_dict.items():
try:
rank = int(rank_id)
@@ -182,17 +202,17 @@ class ClusterCommunicationDataset(ClusterDataset):
raise ValueError(msg) from e
for comm_type, bw_dict in rank_dict.get(self.COMMUNICATION_BANDWIDTH_INFO, {}).items():
if comm_type == self.SDMA:
- self.rank_bw_dict[rank][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE)
- self.rank_bw_dict[rank][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME)
+ self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE)
+ self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME)
if comm_type == self.RDMA:
- self.rank_bw_dict[rank][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE)
- self.rank_bw_dict[rank][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME)
-
- for rank, rank_dict in self.rank_bw_dict.items():
- self.rank_bw_dict[rank][self.RDMA_BANDWIDTH] = self.compute_ratio(
- self.rank_bw_dict[rank][self.RDMA_SIZE_MB], self.rank_bw_dict[rank][self.RDMA_TIME_MS])
- self.rank_bw_dict[rank][self.SDMA_BANDWIDTH] = self.compute_ratio(
- self.rank_bw_dict[rank][self.SDMA_SIZE_MB], self.rank_bw_dict[rank][self.SDMA_TIME_MS])
+ self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE)
+ self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME)
+
+ for step_rank in self.rank_bw_dict.keys():
+ self.rank_bw_dict[step_rank][self.RDMA_BANDWIDTH] = self.compute_ratio(
+ self.rank_bw_dict[step_rank][self.RDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.RDMA_TIME_MS])
+ self.rank_bw_dict[step_rank][self.SDMA_BANDWIDTH] = self.compute_ratio(
+ self.rank_bw_dict[step_rank][self.SDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.SDMA_TIME_MS])
def get_data(self):
return self.rank_bw_dict
diff --git a/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py b/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py
index b108fc77a3f3408d48c79ce6b542f98427d88b0b..8ae0e55f2a5fbc05304fd95809e9b69220dfd3e5 100644
--- a/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py
+++ b/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py
@@ -65,3 +65,6 @@ class ClusterStepTraceTimeBean:
msg = "[ERROR] Cluster step trace time.csv has invalid value in column 'Free'."
raise ValueError(msg) from e
+ @property
+ def stage(self) -> int:
+ return self._data.get(self.INDEX)
diff --git a/profiler/advisor/display/html/priority_background_color.py b/profiler/advisor/display/html/priority_background_color.py
new file mode 100644
index 0000000000000000000000000000000000000000..7da61a093099bc6e9fedf40367176d500b04b185
--- /dev/null
+++ b/profiler/advisor/display/html/priority_background_color.py
@@ -0,0 +1,4 @@
+class PriorityBackgroundColor:
+ high = "#B5495B"
+ medium = "#fcaf17"
+ low = "#65c294"
diff --git a/profiler/advisor/display/html/render.py b/profiler/advisor/display/html/render.py
index 3984fa8f34f0858a7281c9b51caaa43a170baf86..0c1882f133e5ec8a94c8a1ee80d9c53f7bda989b 100644
--- a/profiler/advisor/display/html/render.py
+++ b/profiler/advisor/display/html/render.py
@@ -1,7 +1,7 @@
import os
import logging
from typing import List, Dict
-from collections import defaultdict
+from collections import defaultdict, OrderedDict
from jinja2 import Environment, FileSystemLoader
from profiler.advisor.common import constant
@@ -14,31 +14,72 @@ logger = logging.getLogger()
@singleton
class HTMLRender:
+ SUPPORTED_KEYS = ["main", "overall", "comparison", "computation", "schedule", "communication", "dataloader",
+ "memory"]
+ PERFORMANCE_PROBLEM_ANALYSIS = "performance_problem_analysis"
+
def __init__(self):
self.html = ""
self.render_list = defaultdict(list)
def render_html(self, template_dir: str = "templates", template_name: str = "main.html",
template_header=constant.DEFAULT_TEMPLATE_HEADER):
- self.html = self.render_template("main", template_dir, template_name, render_list=self.render_list,
+
+ # 确保overall 和 comparison 在 performance problem analysis 之前
+ sorted_render_htmls = OrderedDict()
+ for key in ["overall", "comparison"]:
+ if key in self.render_list:
+ sorted_render_htmls[key] = self.render_list.get(key)
+ for key, html in self.render_list.items():
+ if key in sorted_render_htmls:
+ continue
+ sorted_render_htmls[key] = html
+
+ self.html = self.render_template("main", template_dir, template_name, render_list=sorted_render_htmls,
template_header=template_header)
- def render_template(self, key: str, template_dir: str, template_name: str, **kwargs):
+ def get_rendered_html(self, key: str, template_dir: str, template_name: str, **kwargs):
+ if key not in self.SUPPORTED_KEYS:
+ error_msg = f"Error render template key {key}, optionals are {self.SUPPORTED_KEYS}"
+ logger.error(error_msg)
+ raise Exception(error_msg)
+
if not os.path.isabs(template_dir):
template_dir = os.path.join(os.path.dirname(__file__), template_dir)
env = Environment(loader=FileSystemLoader(template_dir),
autoescape=True)
template = env.get_template(template_name)
+ if "priority" not in kwargs:
+ kwargs["priority"] = "low priority"
rendered_html = template.render(**kwargs)
- self.render_list[key].append(rendered_html)
+ return rendered_html
+
+ def render_template(self, key: str, template_dir: str, template_name: str, **kwargs):
+ rendered_html = self.get_rendered_html(key, template_dir, template_name, **kwargs)
+
+ if not kwargs.get("add_render_list", True):
+ return rendered_html
+
+ if key in ["main", "overall", "comparison"]:
+ if key not in self.render_list:
+ self.render_list[key] = []
+ self.render_list[key].append(rendered_html)
+ else:
+ if self.PERFORMANCE_PROBLEM_ANALYSIS not in self.render_list:
+ self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS] = {}
+ if key not in self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS]:
+ self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS][key] = []
+ self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS][key].append(rendered_html)
+
return rendered_html
def save_to_file(self, save_path: str):
+ save_path = os.path.join(Config().work_path, save_path)
if not save_path.endswith(".html"):
logger.error("Skip save html file because file name must endswith `.html`, "
"but got %s.", os.path.basename(save_path))
return
safe_write(self.html, save_path)
- logger.info("Save suggestion to %s.", os.path.join(Config().work_path, save_path))
+ logger.info("Save suggestion to %s.", save_path)
diff --git a/profiler/advisor/display/html/templates/affinity_api.html b/profiler/advisor/display/html/templates/affinity_api.html
index 4d12c3e37536392d122f85fc6ef3a4fcc123ef77..2b4fa453271a8b8170690951fcea86c625562f3a 100644
--- a/profiler/advisor/display/html/templates/affinity_api.html
+++ b/profiler/advisor/display/html/templates/affinity_api.html
@@ -3,9 +3,9 @@
The analysis results of following affinity APIs are based on runtime env
- cann-{{ cann_version }}
+ cann-{{ cann_version }}
and
- torch-{{ torch_version }}
+ torch-{{ torch_version }}
@@ -13,7 +13,7 @@
Suggestion:
These APIs have no code stack. If parameter 'with_stack=False' was set while profiling, please refer to
Ascend PyTorch Profiler to set
-
'with_stack=True'. Otherwise, ignore following affinity APIs due to backward broadcast lack of stack.
+
'with_stack=True'. Otherwise, ignore following affinity APIs due to backward broadcast lack of stack.
{% endif %}
{% for api_name, stacks in result.items() %}
diff --git a/profiler/advisor/display/html/templates/main.html b/profiler/advisor/display/html/templates/main.html
index 3727125b419547fc6a9ac9743eab34e1e1b76256..61c52d1db906ed8754f17458eaaffcd5adfc4fe3 100644
--- a/profiler/advisor/display/html/templates/main.html
+++ b/profiler/advisor/display/html/templates/main.html
@@ -137,10 +137,21 @@
Performance Optimization Suggestions
+
+
+
Optimization Priority:
+
+
High
+
+
Medium
+
+
Low
+
+
{% for key, renders in render_list.items() %}
- {% if key == 'operator'%}
+ {% if key != 'performance_problem_analysis' %}
-
+
{% for render in renders %}
{{render|safe}}
@@ -148,14 +159,25 @@
{% else %}
+
-
+
- {% for render in renders %}
- {{render|safe}}
- {% endfor %}
+
+
+ {% for sub_key, sub_renders in renders.items() %}
+
+
+
+ {% for render in sub_renders %}
+ {{render|safe}}
+ {% endfor %}
+
+
+ {% endfor %}
+
{% endif %}
{% endfor %}