diff --git a/profiler/advisor/analyzer/analyzer_controller.py b/profiler/advisor/analyzer/analyzer_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..a93c8e0acd9c499ccfff76dc0c9518a7e22af93b --- /dev/null +++ b/profiler/advisor/analyzer/analyzer_controller.py @@ -0,0 +1,381 @@ +import copy +import logging +import json +import sys +import os +from pathlib import Path + +sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "compare_tools")) +sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "cluster_analyse")) + +from profiler.advisor.analyzer.cluster.slow_rank_analyzer import SlowRankAnalyzer +from profiler.advisor.analyzer.cluster.slow_link_analyzer import SlowLinkAnalyzer +from profiler.advisor.analyzer.overall.overall_summary_analyzer import OverallSummaryAnalyzer +from profiler.advisor.common import constant as const +from profiler.advisor.common.analyzer_scopes import SupportedScopes +from profiler.advisor.interface.interface import Interface +from profiler.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor + +logger = logging.getLogger() + + +class AnalyzerController: + OBS_PREFIX = "obs://" + CLUSTER_RANK_THRESHOLD = 2 + + def __init__(self, dimensions, **kwargs): + self.dimensions = dimensions + self.kwargs = kwargs + self.slow_rank_analyzer = None + self.slow_link_analyzer = None + self.cluster_local_data_map = {} + self.default_rank_id = None + self.rank_id_map = {} + self._is_cluster = False + + @staticmethod + def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data, + headers, dimension, get_max=False): + if dimension not in headers: + logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers) + return None, None + + dimension_index = headers.index(dimension) + diff_record = [] + for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data): + target_data = target_row_data[dimension_index] + benchmark_data = benchmark_row_data[dimension_index] + if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)): + continue + diff_record.append(target_data - benchmark_data) + + if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len( + diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD: + return None, None + + value = max(diff_record) if get_max else min(diff_record) + value_index = diff_record.index(value) + step = target_cluster_statistic_data[value_index][headers.index("step")] + target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")] + + return step, target_rank_id + + @staticmethod + def _get_target_profiling_path_for_obs(profiling_path, rank_id): + # TODO + return None + + def do_analysis(self): + result_list = [] + + profiling_path = self.kwargs.get("profiling_path") + benchmark_profiling_path = self.kwargs.get("benchmark_profiling_path") + + if not self._check_profiling_path_valid(profiling_path): + logger.error("Got invalid argument '-d/--profiling_path' %s, skip analysis", profiling_path) + return + if benchmark_profiling_path and not self._check_profiling_path_valid(benchmark_profiling_path): + logger.error("Got invalid argument '-bp/--benchmark_profiling_path' %s, skip analysis", + benchmark_profiling_path) + return + + self._is_cluster = self._is_cluster_profiling(profiling_path) + if not self._is_cluster: + job_list = self.single_rank_analysis(profiling_path, benchmark_profiling_path) + else: + job_list = self.cluster_analysis(profiling_path, benchmark_profiling_path) + + for i, (dimension, scope, interface, kwargs) in enumerate(job_list[::-1]): + result_list.append( + interface.get_result(dimension, scope, render_html=i == len(job_list) - 1, output_dict=False, **kwargs) + ) + + for result in result_list[::-1]: + if result and hasattr(result, "show"): + result.show() + break + + def single_rank_analysis(self, profiling_path, benchmark_profiling_path=None): + job_list = [] + + profiling_path = self._get_profiling_path_by_rank(profiling_path) + benchmark_profiling_path = self._get_profiling_path_by_rank(benchmark_profiling_path) + + # 单卡场景无通信和集群分析 + for dim in [Interface.COMMUNICATION, Interface.CLUSTER]: + if dim in self.dimensions: + self.dimensions.remove(dim) + + for dimension in self.dimensions: + dimension_analysis_func_name = f"{dimension}_analysis" + if not hasattr(self, dimension_analysis_func_name): + continue + logger.info("Start %s analysis", dimension) + job_list += getattr(self, dimension_analysis_func_name)(profiling_path) + + if benchmark_profiling_path: + # kernel/api 比对 + job_list += self._single_profiling_comparison(profiling_path, benchmark_profiling_path) + else: + # 单卡性能拆解 + self.overall(profiling_path) + return job_list + + def cluster_analysis(self, profiling_path, benchmark_profiling_path=None): + job_list = [] + + # 单集群profiling分析:下发、通信、计算、显存/内存 + for dimension in self.dimensions: + dimension_analysis_func_name = f"cluster_{dimension}_analysis" + if not hasattr(self, dimension_analysis_func_name): + continue + logger.info("Start cluster %s analysis", dimension) + job_list += getattr(self, dimension_analysis_func_name)(profiling_path) + + if benchmark_profiling_path: + # 两个集群profiling比对分析 + job_list += self._cluster_profiling_comparison(profiling_path, benchmark_profiling_path) + else: + # TODO 集群计算/下发/通信 cluster analysis topk + self.overall(profiling_path) + return job_list + + def overall(self, profiling_path): + if self._is_cluster: + self.slow_rank_analyzer.optimize(template_key=Interface.OVERALL) + self.slow_link_analyzer.optimize(template_key=Interface.OVERALL) + else: + overall_analyzer = OverallSummaryAnalyzer(profiling_path) + overall_analyzer.optimize() + + def schedule_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None): + # 任意单卡的下发分析 + + kwargs = copy.deepcopy(self.kwargs) + job_list = [] + + kwargs["profiling_path"] = profiling_path + kwargs["benchmark_profiling_path"] = benchmark_profiling_path + kwargs["step"] = step + kwargs["benchmark_step"] = benchmark_step + + for dimension in [Interface.SCHEDULE, Interface.DATALOADER]: + for scope in Interface.get_scope(dimension): + interface = Interface(**kwargs) + job_list.append((dimension, scope, interface, kwargs)) + return job_list + + def computation_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, + benchmark_step=None, stage=None): + # 任意单卡的计算分析 + + kwargs = copy.deepcopy(self.kwargs) + kwargs["profiling_path"] = profiling_path + kwargs["benchmark_profiling_path"] = benchmark_profiling_path + kwargs["step"] = step + kwargs["benchmark_step"] = benchmark_step + kwargs["stage"] = stage + job_list = [] + + for dimension in [Interface.COMPUTATION]: + for scope in Interface.get_scope(dimension): + if scope == SupportedScopes.STAGE_COMPUTE: + continue + interface = Interface(**kwargs) + job_list.append((dimension, scope, interface, kwargs)) + return job_list + + def communication_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None): + # 任意单卡的通信分析 + kwargs = copy.deepcopy(self.kwargs) + job_list = [] + # TODO,根据当前的讨论,这块主要是基于带宽的分析,需要能闭环通信重传、算子带宽异常检测 等问题 + return job_list + + def cluster_schedule_analysis(self, profiling_path): + # 目标集群profiling数据下发分析,不包含两个集群profiling数据的比对分析 + + job_list = [] + global_step_rank = self.slow_rank_analyzer.get_global_step_rank(SlowRankAnalyzer.FREE) + slow_rank_id = global_step_rank.get("maximum", {}).get("rank_id") or self.default_rank_id + slow_step = global_step_rank.get("maximum", {}).get("step") + analysis_profiling_path = self._get_profiling_path_by_rank(profiling_path, slow_rank_id) + + logger.info("Rank %s and step %s with max free", slow_rank_id, slow_step) + + job_list += self.schedule_analysis(analysis_profiling_path, step=slow_step) + return job_list + + def cluster_communication_analysis(self, profiling_path): + # 目标集群profiling数据通信分析,不包含两个集群profiling数据的比对分析 + + job_list = [] + + for bindwidth_type in [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA]: + global_step_rank = self.slow_link_analyzer.get_global_step_rank(bindwidth_type) + # 获取带宽最小的卡进行分析 + target_rank_id = global_step_rank.get("minimum", {}).get("rank_id") or self.default_rank_id + step = global_step_rank.get("minimum", {}).get("step") + analysis_profiling_path = self._get_profiling_path_by_rank(profiling_path, target_rank_id) + logger.info("Rank %s and step %s with minimum %s bindwidth", target_rank_id, step, bindwidth_type) + job_list += self.communication_analysis(analysis_profiling_path, step=step) + + return job_list + + def cluster_computation_analysis(self, profiling_path): + # 目标集群profiling数据计算分析,不包含两个集群profiling数据的比对分析;如果有pp stage,则对不同stage进行计算分析 + + job_list = [] + global_step_rank = self.slow_rank_analyzer.get_global_step_rank(SlowRankAnalyzer.COMPUTE) + _ = self.slow_rank_analyzer.get_stage_step_rank(SlowRankAnalyzer.COMPUTE) + + # TODO, 对不同pp stage取min max进行分析 + + # 不区分stage,对所有卡取Min max进行分析 + logger.info("Without pipeline parallel stage, Global analysis steps and ranks is %s", + json.dumps(global_step_rank)) + slow_rank_id = global_step_rank.get("maximum", {}).get("rank_id") or self.default_rank_id + slow_step = global_step_rank.get("maximum", {}).get("step") + # 如果没有标杆profiling数据的rank id,说明没有快慢卡问题,直接对默认rank id进行分析,因此这里取值为None + fast_rank_id = global_step_rank.get("minimum", {}).get("rank_id") + fast_step = global_step_rank.get("minimum", {}).get("step") + + logger.info("Max compute for rank %s with step %s, min compute for rank %s with step %s", slow_rank_id, + slow_step, fast_rank_id, fast_step) + + job_list += self.computation_analysis( + self._get_profiling_path_by_rank(profiling_path, slow_rank_id), + self._get_profiling_path_by_rank(profiling_path, fast_rank_id), + slow_step, + fast_step + ) + + return job_list + + def _single_profiling_comparison(self, profiling_path, benchmark_profiling_path, step=None, + benchmark_step=None): + # TODO 基于compare tools 对比计算下发 + kwargs = copy.deepcopy(self.kwargs) + return [] + + def _cluster_profiling_comparison(self, profiling_path, benchmark_profiling_path): + # 从计算、下发和通信三个维度对集群profiling数据进行对比 + + job_list = [] + benchmark_profiling_path = self._get_profiling_path_by_rank(benchmark_profiling_path) + benchmark_slow_rank_analyzer = SlowRankAnalyzer(benchmark_profiling_path) + benchmark_slow_link_analyzer = SlowLinkAnalyzer(benchmark_profiling_path) + + # 计算和下发分析 + job_list += self._cluster_data_comparison(profiling_path, + benchmark_profiling_path, + self.slow_rank_analyzer, + benchmark_slow_rank_analyzer, + get_max=True) + + # 通信分析 + job_list += self._cluster_data_comparison(profiling_path, + benchmark_profiling_path, + self.slow_link_analyzer, + benchmark_slow_link_analyzer, + get_max=False) + return job_list + + def _cluster_data_comparison(self, profiling_path, benchmark_profiling_path, target_cluster_analyzer, + benchmark_cluster_analyzer, get_max=False): + # #low rank/slow link结果逐行对比获取差值最大的rank和step进行单卡分析 + job_list = [] + + if isinstance(target_cluster_analyzer, SlowRankAnalyzer): + comparison_dims = [SlowRankAnalyzer.COMPUTE, SlowRankAnalyzer.FREE] + elif isinstance(target_cluster_analyzer, SlowLinkAnalyzer): + comparison_dims = [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA] + else: + return job_list + + target_data = target_cluster_analyzer.format_datas.get("data", []) + benchmark_data = benchmark_cluster_analyzer.format_datas.get("data", []) + headers = benchmark_cluster_analyzer.format_datas.get("headers", []) + + if len(target_data) != len(benchmark_data): + logger.warning( + "Number of rank ids of Benchmark profiling not equals to target profiling, skip cluster comparison.") + return job_list + + for dimension in comparison_dims: + step_for_comparison, rank_id_for_comparison = AnalyzerController._get_step_rank_for_cluster_statistic_diff( + target_data, + benchmark_data, + headers, + dimension, + get_max=get_max + ) + rank_profiling_path = self._get_profiling_path_by_rank(profiling_path, rank_id_for_comparison) + rank_benchmark_profiling_path = self._get_profiling_path_by_rank( + benchmark_profiling_path, + rank_id_for_comparison + ) + + job_list += self._single_profiling_comparison( + rank_profiling_path, + rank_benchmark_profiling_path, + step_for_comparison, + step_for_comparison + ) + return job_list + + def _is_cluster_profiling(self, profiling_path): + # 判断是否为集群场景,obs场景没办法调用PytorchDataPreprocessor判断是否是集群场景,只能根据ascend_pt后缀来判断 + + if not profiling_path.startswith(self.OBS_PREFIX): + # 数据在本地盘 + path_list = [os.path.join(profiling_path, dir_name) for dir_name in os.listdir(profiling_path)] + ascend_pt_dirs = [path for path in path_list if os.path.isdir(path) and path.endswith("ascend_pt")] + data_processor = PytorchDataPreprocessor(ascend_pt_dirs) + + self.cluster_local_data_map[profiling_path] = data_processor.get_data_map() + + if not self.cluster_local_data_map or not self.cluster_local_data_map.get(profiling_path): + return False + + self.default_rank_id = list(self.cluster_local_data_map[profiling_path].keys())[0] + + self.slow_rank_analyzer = SlowRankAnalyzer(profiling_path) + self.slow_link_analyzer = SlowLinkAnalyzer(profiling_path) + return len(self.cluster_local_data_map[profiling_path]) >= self.CLUSTER_RANK_THRESHOLD + + # TODO, 数据在obs上 + return False + + def _get_profiling_path_by_rank(self, profiling_path, rank_id=None): + # 根据传入的集群/单卡 profiling数据路径,以及rank id,获取对应rank的profiling数据路径,如果数据在obs则会自动下载至本地。 + + if profiling_path is None: + return profiling_path + + if profiling_path.startswith(self.OBS_PREFIX): + # profiling数据在obs上时,自动下载对应rank的数据至本地,并返回本地路径 + return AnalyzerController._get_target_profiling_path_for_obs(profiling_path, rank_id) + + return self._get_target_profiling_path_for_local(profiling_path, rank_id) + + def _get_target_profiling_path_for_local(self, profiling_path, rank_id): + rank_id_map = self.cluster_local_data_map.get(profiling_path, {}) + if rank_id is None or not rank_id_map: + return profiling_path + + return rank_id_map.get(rank_id) + + def _check_profiling_path_valid(self, profiling_path): + # 检查profiling数据路径是否存在,支持obs路径和本地路径 + + if profiling_path.startswith(self.OBS_PREFIX): + # TODO + return False + elif not Path(profiling_path).exists(): + logger.error("Profiling path is not existed. Invalid profiling path: %s", profiling_path) + return False + else: + return True + + diff --git a/profiler/advisor/analyzer/base_analyzer.py b/profiler/advisor/analyzer/base_analyzer.py index 80368e1d60a14020637ba60bb41c5536dcf2e081..8938756115c89df272c0d542e97034d76850c2c6 100644 --- a/profiler/advisor/analyzer/base_analyzer.py +++ b/profiler/advisor/analyzer/base_analyzer.py @@ -63,7 +63,7 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta): return None logger.info("Enable analysis %s with %s", self.__class__.__name__, ",".join(data_list)) - return func(self) + return func(self, **kwargs) return wrapper @@ -76,7 +76,7 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta): def init_dataset_list(self)->None: dataset_cls_list = self.dataset_cls_list if len(dataset_cls_list) == 0: - logger.warning(f"Analyser: %s don't rely on any dataset!", self.__class__.__name__) + logger.warning(f"Analyzer: %s don't rely on any dataset!", self.__class__.__name__) return for dataset_cls in dataset_cls_list: diff --git a/profiler/advisor/analyzer/cluster/slow_link_analyser.py b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py similarity index 57% rename from profiler/advisor/analyzer/cluster/slow_link_analyser.py rename to profiler/advisor/analyzer/cluster/slow_link_analyzer.py index 0b585cbc7c5f136b15cd9eb035ea2dac5caa9e4e..bb2ba5f2a4092baafee7a034f48d7dcc516a0f0e 100644 --- a/profiler/advisor/analyzer/cluster/slow_link_analyser.py +++ b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py @@ -15,11 +15,16 @@ from collections import defaultdict from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterCommunicationDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowLinkAnalyzer(BaseAnalyzer): @@ -34,7 +39,8 @@ class SlowLinkAnalyzer(BaseAnalyzer): TRANSIT_SIZE = "Transit Size(MB)" SDMA = "SDMA" RDMA = "RDMA" - SLOW_LINK_ANALYSIS = "slow_link_analysis" + SLOW_LINK_ANALYSIS = "slow link" + RATIO_THRESHOLD = 0.05 dataset_cls_list = [ClusterCommunicationDataset] def __init__(self, collection_path, n_processes: int = 1, **kwargs): @@ -45,18 +51,24 @@ class SlowLinkAnalyzer(BaseAnalyzer): self.result = OptimizeResult() self.bottelneck = '' self.suggestion = '' - self.format_datas = [] + self.format_datas = self.format_details() + + @staticmethod + def compute_max_gap_ratio(data: list, mean: float): + if mean == 0: + return 0 + else: + return (max(data) - min(data)) / mean def optimize(self, **kwargs): if self.rank_bw_dict is None: - print("Slow link analysis failed due to data loading failure. \ + logger.error("Slow link analysis failed due to data loading failure. \ Please check your cluster_analysis_output folder. \ If you are not concerned about this type of data, please ignore this message.") return self.result self.process() - self.format_datas = self.format_details() self.make_record() - self.make_render() + self.make_render(kwargs.get("template_key")) return self.result def process(self): @@ -69,7 +81,7 @@ class SlowLinkAnalyzer(BaseAnalyzer): if len(data_list) > 0: avg_bw = round(sum(data_list) / len(data_list), 3) else: - print("The slow link (identified bottleneck) cannot provide a bottleneck \ + logger.info("The slow link (identified bottleneck) cannot provide a bottleneck \ because the analysis data is missing bandwidth information.") return self.bottelneck += f'{link_type}: \n' \ @@ -88,14 +100,19 @@ class SlowLinkAnalyzer(BaseAnalyzer): details_dict = {} headers = list({k for rank_bw_value in self.rank_bw_dict.values() for k in rank_bw_value.keys()}) headers.sort() - data_list = [[rank_id] + [rank_bw.get(k, 0) for k in headers] for rank_id, rank_bw in self.rank_bw_dict.items()] - data_list.sort(key = lambda x: x[0]) # 按rank_id排序 - - details_dict["headers"] = ["rank_id"] + headers + + data_list = [] + for step_rank, rank_bw in self.rank_bw_dict.items(): + step_rank_list = list(map(int, step_rank.split(constant.STEP_RANK_SEP))) + value_list = [rank_bw.get(i, 0) for i in headers] + data_list.append(step_rank_list + value_list) + data_list.sort(key=lambda x: (x[0], x[1])) # 按rank_id排序 + + details_dict["headers"] = ["step", "rank_id"] + headers details_dict["data"] = data_list return details_dict - + def make_record(self): """ make record for what and how to optimize @@ -107,20 +124,65 @@ class SlowLinkAnalyzer(BaseAnalyzer): ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas["headers"], data) + for data in self.format_datas.get("data", []): + self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas.get("headers", []), data) - def make_render(self): + def make_render(self, template_key="cluster"): result_for_html = { - "Description" : self.bottelneck, - "suggestion" : self.suggestion, - "details" : [self.format_datas] + "Description": self.bottelneck, + "suggestion": self.suggestion, + "details": [self.format_datas] } - self.html_render.render_template(key="cluster", + self.html_render.render_template(key=template_key, title=SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, template_dir="templates", template_name="cluster_analysis.html", cann_version=self.cann_version, torch_version=self.torch_version, - result=result_for_html) \ No newline at end of file + result=result_for_html) + + def get_global_step_rank(self, bindwidth_type): + global_step_rank = {} + bindwidth_key_map = {self.RDMA: self.RDMA_BANDWIDTH, self.SDMA: self.SDMA_BANDWIDTH} + + if bindwidth_type not in bindwidth_key_map: + raise RuntimeError(f"Error bindwidth type {bindwidth_type}, optionals are {bindwidth_key_map.keys()}") + + headers = self.format_datas.get("headers") + + bindwidth_index = safe_index(headers, bindwidth_key_map.get(bindwidth_type)) + + if bindwidth_index is not None: + data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data", [])] + max_bandwidth, min_bandwidth = max(data_list), min(data_list) + + if self.compute_max_gap_ratio(data_list, sum(data_list) / len( + data_list)) < self.RATIO_THRESHOLD: + return global_step_rank + + max_bandwidth_index = data_list.index(max_bandwidth) + min_bandwidth_index = data_list.index(min_bandwidth) + + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + + if rank_id_index is None: + return global_step_rank + + max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][rank_id_index] + min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][rank_id_index] + + if step_index is None: + max_bandwidth_step, min_bandwidth_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP + else: + max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][step_index] + min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][step_index] + + global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step} + global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step} + + return global_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/cluster/slow_rank_analyser.py b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py similarity index 43% rename from profiler/advisor/analyzer/cluster/slow_rank_analyser.py rename to profiler/advisor/analyzer/cluster/slow_rank_analyzer.py index f439b31f7736ee4777d5ef10bf968738a76ae1b3..fb81883fee4685fa8c76281ace4a9b3edb204312 100644 --- a/profiler/advisor/analyzer/cluster/slow_rank_analyser.py +++ b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py @@ -13,43 +13,61 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict -from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterStepTraceTimeDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowRankAnalyzer(BaseAnalyzer): - SLOW_RANK_ANALYSIS = "slow_rank_analysis" + SLOW_RANK_ANALYSIS = "slow rank" RANK = "rank" RATIO_THRESHOLD = 0.05 BOTTLENECK_LIST = ['Computing', 'Communication', "Free"] dataset_cls_list = [ClusterStepTraceTimeDataset] + COMPUTE = "compute(us)" + FREE = "free(us)" + COMMUNICATION = "communication(us)" def __init__(self, collection_path, n_processes: int = 1, **kwargs): super().__init__(collection_path, n_processes, **kwargs) key = ClusterStepTraceTimeDataset.get_key() - self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) + self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) self.step_trace_dict = self.step_trace_class.get_data() + self.stages = self.step_trace_class.get_stages() self.result = OptimizeResult() self.bottelneck = '' self.suggestion = '' - self.format_datas = [] + self._steps = set() + self.format_datas = self.format_details() + + @property + def steps(self): + return sorted(list(self._steps)) + + @staticmethod + def compute_max_gap_ratio(data: list, mean: float): + if mean == 0: + return 0 + else: + return (max(data) - min(data)) / mean def optimize(self, **kwargs): if self.step_trace_dict is None: - print("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ + logger.error("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ 如不关心这类数据请忽略") return self.result self.process() - self.format_datas = self.format_details() self.make_record() - self.make_render() + self.make_render(kwargs.get("template_key")) return self.result - + def process(self): total_time_list = [sum(data_tuple) for rank_id, data_tuple in self.step_trace_dict.items()] if total_time_list: @@ -57,6 +75,9 @@ class SlowRankAnalyzer(BaseAnalyzer): for i in range(len(self.BOTTLENECK_LIST)): self.produce_bottleneck(self.step_trace_dict, i, mean_total_time) + if not self.bottelneck: + self.bottelneck = "There is no slow rank issues" + def produce_bottleneck(self, step_dict: dict, produce_type: int, mean_total_time: float): data_list = [data_tuple[produce_type] for rank_id, data_tuple in step_dict.items()] max_ratio = self.compute_max_gap_ratio(data_list, mean_total_time) @@ -70,33 +91,41 @@ class SlowRankAnalyzer(BaseAnalyzer): """ make record for what and how to optimize """ + optimization_item = OptimizeItem( SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.bottelneck, self.suggestion ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.format_datas["headers"], data) + + data_list = self.format_datas.get("data", []) + headers = self.format_datas.get("headers", []) + for data in data_list: + self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, headers, data) def format_details(self): details_dict = {} - headers = ["rank_id", "compute(us)", "communication(us)", "free(us)"] + headers = ["step", "rank_id", "compute(us)", "communication(us)", "free(us)"] data_list = [] - for key,value in self.step_trace_dict.items(): - data_list.append([key] + value) + for key, value in self.step_trace_dict.items(): + step, rank_id = key.split(constant.STEP_RANK_SEP) + data_list.append([int(step), int(rank_id)] + value) + if step and step not in self._steps: + self._steps.add(step) + details_dict["headers"] = headers - details_dict["data"] = data_list + details_dict["data"] = sorted(data_list, key=lambda x: (x[0], x[1])) return details_dict - def make_render(self): + def make_render(self, template_key="cluster"): result_for_html = { - "Description" : self.bottelneck, - "suggestion" : self.suggestion, - "details" : [self.format_datas] + "Description": self.bottelneck, + "suggestion": self.suggestion, + "details": [self.format_datas] } - self.html_render.render_template(key="cluster", + self.html_render.render_template(key=template_key, title=SlowRankAnalyzer.SLOW_RANK_ANALYSIS, template_dir="templates", template_name="cluster_analysis.html", @@ -104,9 +133,83 @@ class SlowRankAnalyzer(BaseAnalyzer): torch_version=self.torch_version, result=result_for_html) - @staticmethod - def compute_max_gap_ratio(data: list, mean: float): - if mean == 0: - return 0 + def get_global_step_rank(self, dimension): + global_step_rank = {} + + headers = self.format_datas.get("headers") + + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return global_step_rank + + data_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] + max_time, min_time = max(data_list), min(data_list) + + if self.compute_max_gap_ratio(data_list, sum(data_list) / len( + data_list)) < self.RATIO_THRESHOLD: + return global_step_rank + max_time_index = data_list.index(max_time) + min_time_index = data_list.index(min_time) + + max_time_rank_id = self.format_datas.get("data")[max_time_index][rank_id_index] + min_time_rank_id = self.format_datas.get("data")[min_time_index][rank_id_index] + + if step_index is not None: + max_time_step = self.format_datas.get("data")[max_time_index][step_index] + min_time_step = self.format_datas.get("data")[min_time_index][step_index] else: - return (max(data) - min(data)) / mean + max_time_step, min_time_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP + + global_step_rank["maximum"] = {"rank_id": max_time_rank_id, "step": max_time_step} + global_step_rank["minimum"] = {"rank_id": min_time_rank_id, "step": min_time_step} + + return global_step_rank + + def get_stage_step_rank(self, dimension): + stage_step_rank = {} + + headers = self.format_datas.get("headers") + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return stage_step_rank + + rank_list = [tuple_list[rank_id_index] for tuple_list in self.format_datas.get("data")] + cost_time_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] + + if step_index is not None: + step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")] + else: + step_list = [constant.DEFAULT_STEP] * len(rank_list) + + for index, stage in enumerate(self.stages): + tmp_step_list, tmp_rank_list, tmp_time_list = [], [], [] + for step, rank_id, time in zip(step_list, rank_list, cost_time_list): + if rank_id not in stage: + continue + + tmp_step_list.append(step) + tmp_rank_list.append(rank_id) + tmp_time_list.append(time) + + if self.compute_max_gap_ratio(tmp_time_list, sum(tmp_time_list) / len( + tmp_time_list)) < self.RATIO_THRESHOLD: + continue + + max_time, min_time = max(tmp_time_list), min(tmp_time_list) + max_time_index, min_time_index = tmp_time_list.index(max_time), tmp_time_list.index(min_time) + + stage_key = f"stage-{index}" + stage_step_rank[stage_key] = {} + stage_step_rank[stage_key]["maximum"] = {"rank_id": tmp_rank_list[max_time_index], + "step": tmp_step_list[max_time_index]} + stage_step_rank[stage_key]["minimum"] = {"rank_id": tmp_rank_list[min_time_index], + "step": tmp_step_list[min_time_index]} + + return stage_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py index 4f25deff7c0cdb415ccae6ab748304d4044c5eec..52e5594bf0f19154270a1e0aa073b534c703e2cd 100644 --- a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py +++ b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_analyzer.py @@ -26,11 +26,10 @@ class AICoreFreqAnalyzer(BaseAnalyzer): if not Config().get_config("aic_frequency"): logger.warning("Can not find ai core frequency in info.json*, please check data integrity.") return self.result + add_render_list = kwargs.get("add_render_list", True) ai_core_freq_checker = AICoreFreqChecker() - ai_core_freq_checker.check_ai_core_freq(self.dataset) - if not ai_core_freq_checker.ai_core_freq_issues: - return self.result + ai_core_freq_checker.check_ai_core_freq(self.dataset, rank_id=kwargs.get("rank_id"), stage=kwargs.get("stage")) ai_core_freq_checker.make_record(self.result) self.html = ai_core_freq_checker.make_render(self.html_render, add_render_list) return self.result diff --git a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py index 7afa09cca48fd9939c4fcbfdf2a9fb5f29e3b468..28155207e3a4c6ccce70687ee2561c6ecbb80876 100644 --- a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py +++ b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py @@ -61,6 +61,8 @@ class AICoreFreqChecker: self.decrease_freq_ops.sort(key= lambda x: (x[self.TOTAL_DURATION_INDEX], x[self.DECREASE_FREQ_RATIO_INDEX]), reverse=True) + if not self.ai_core_freq_issues: + return self.desc = (f"{len(self.decrease_freq_ops)} operators are found during frequency reduction, and the reduction " f"ratio is larger than {self.DECREASE_FREQ_RATIO}.") @@ -72,22 +74,27 @@ class AICoreFreqChecker: """ make record for what and how to optimize """ - optimization_item = OptimizeItem("AI Core Frequency", self.desc, [self.suggestions]) + if not self.ai_core_freq_issues: + return + + sheet_name = "AI Core Frequency" + if self.rank_id is not None: + sheet_name = f"rank {self.rank_id} AI Core Frequency".capitalize() + + optimization_item = OptimizeItem(sheet_name, self.desc, [self.suggestions]) result.add(OptimizeRecord(optimization_item)) self.headers = ["Operator name", "Count", "Total duration(us)", "AI CORE frequency decreased ratio", "Average frequency", "Max frequency", "Min frequency"] - if self.rank_id: - self.headers = ["Rank id"] + self.headers - sub_table_name = "AI Core Frequency" if not self.stage else f"Stage-{self.stage}: AI Core Frequency" - result.add_detail(sub_table_name, headers=self.headers) + result.add_detail(sheet_name, headers=self.headers) for row in self.decrease_freq_ops: - if self.rank_id: - row = [self.rank_id] + row - result.add_detail(sub_table_name, detail=row) + result.add_detail(sheet_name, detail=row) def make_render(self, html_render, add_render_list=True): + if not self.ai_core_freq_issues: + return None + if self.SHOW_TOPK_OPS: self.desc += f" Only show {self.SHOW_TOPK_OPS} operators here, see latest mstt_advisor.xlsx for details." return html_render.render_template(key="computation", diff --git a/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py index 0caede4b894e0dda15333e6d3a480fa943c66323..05637cb06f098c45ba02a94c26061e62da4d05a1 100644 --- a/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py +++ b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py @@ -145,11 +145,14 @@ class AicpuChecker(OperatorChecker): ",".join(double_type_ai_cpu_operator))) return True - def make_render(self, html_render, record): - html_render.render_template(key="computation", - template_dir="templates", - template_name="operator_ai_cpu.html", - format_result=self.format_operator_result(record, constant.OPERATOR_LIST_UNLIMIT)) + def make_render(self, html_render, record, add_render_list=True): + + return html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_ai_cpu.html", + format_result=self.format_operator_result(record, + constant.OPERATOR_LIST_UNLIMIT), + add_render_list=add_render_list) def format_operator_result(self, record, limit): """ diff --git a/profiler/advisor/analyzer/computation/bound/block_dim_checker.py b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py index 7a873c65635fcc8f2ebb35c8d317de09d78da491..f04c668ab2f5b0aea89681641021bcdda7bf7e91 100644 --- a/profiler/advisor/analyzer/computation/bound/block_dim_checker.py +++ b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py @@ -45,11 +45,13 @@ class BlockDimChecker(OperatorChecker): "task duration are as follows:\n" return True - def make_render(self, html_render, record): - html_render.render_template(key="computation", - template_dir="templates", - template_name="operator_block_dim.html", - format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK)) + def make_render(self, html_render, record, add_render_list=True): + return html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_block_dim.html", + format_result=self.format_operator_result(record, + constant.OPERATOR_OUT_TOPK), + add_render_list=add_render_list) def _check_operator(self, op_info) -> bool: if op_info.task_type not in ["AI_CORE", "AI_VECTOR_CORE", "MIX_AIC"]: diff --git a/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py index a22b380f974b14207d6d7be262cd49f0ba0fbe99..9e4179425dfc57260541ea0301dae7174b2c7951 100644 --- a/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py +++ b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py @@ -46,8 +46,10 @@ class OperatorBoundChecker(OperatorChecker): return False return True - def make_render(self, html_render, record): - html_render.render_template(key="computation", - template_dir="templates", - template_name="operator_no_bound.html", - format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK)) + def make_render(self, html_render, record, add_render_list=True): + return html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_no_bound.html", + format_result=self.format_operator_result(record, + constant.OPERATOR_OUT_TOPK), + add_render_list=add_render_list) diff --git a/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py index 86d3bac4ff8cb163d23a6365307b855839b12a6a..916534434f143dbacf1488cd88b20b58c26eb5b2 100644 --- a/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py +++ b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py @@ -27,11 +27,13 @@ class DynamicShapeChecker(OperatorChecker): def check(self, profiling_database) -> bool: return self.is_dynamic_shape(profiling_database) - def make_record(self, profiling_database) -> OptimizeRecord: + def make_record(self, profiling_database, rank_id=None) -> OptimizeRecord: """ make record for what and how to optimize """ + if rank_id is not None: + self._PROBLEM = f"rank {rank_id} ".capitalize() + self._PROBLEM.lower() optimization_item = OptimizeItem( self._PROBLEM, self._description, @@ -58,8 +60,9 @@ class DynamicShapeChecker(OperatorChecker): format_result = {"record": record.__dict__, "suggestion": '
'.join(release_suggestion_list)} return format_result - def make_render(self, html_render, record): - html_render.render_template(key="computation", - template_dir="templates", - template_name="operator_dynamic_shape.html", - format_result=self.format_operator_result(record)) + def make_render(self, html_render, record, add_render_list=True): + return html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_dynamic_shape.html", + format_result=self.format_operator_result(record), + add_render_list=add_render_list) diff --git a/profiler/advisor/analyzer/computation/operator_checker.py b/profiler/advisor/analyzer/computation/operator_checker.py index 64618b56a8df7f380277e99ae7ca47cd69d24648..908659643baab3754904530a117f15a23ded50b6 100644 --- a/profiler/advisor/analyzer/computation/operator_checker.py +++ b/profiler/advisor/analyzer/computation/operator_checker.py @@ -40,6 +40,23 @@ class OperatorChecker(VersionControl): self.cann_version = cann_version self._op_list: List[OpInfo] = [] + @classmethod + def get_name(cls): + """ + get name of checker + :return: checker name + """ + return cls._PROBLEM + + @staticmethod + def get_ratio(op_info: OpInfo, attr: str) -> float: + if not op_info.has_attr(attr): + return 0 + value = op_info.get_attr(attr) + if not value or value == "N/A": + return 0 + return float(value) + def check(self, profiling_data: ProfilingDataset) -> bool: """ check if any operator need optimize @@ -77,12 +94,16 @@ class OperatorChecker(VersionControl): return True return False - def make_record(self, profiling_data: ProfilingDataset): + def make_record(self, profiling_data: ProfilingDataset, rank_id=None): """ Make record for what and how to optimize :param profiling_data: profiling data :return: optimize record """ + + if rank_id is not None: + self._PROBLEM = f"rank {rank_id} ".capitalize() + self._PROBLEM.lower() + task_duration_list = [float(op_info.get_attr("task_duration")) for op_info in self._op_list if hasattr(op_info, "get_attr")] total_cost_time = sum(task_duration_list) @@ -239,14 +260,6 @@ class OperatorChecker(VersionControl): """Get node views.""" return [] - @classmethod - def get_name(cls): - """ - get name of checker - :return: checker name - """ - return cls._PROBLEM - def get_incomes(self) -> float: """get incomes""" incomes = 0.0 @@ -270,15 +283,6 @@ class OperatorChecker(VersionControl): return False return True - @staticmethod - def get_ratio(op_info: OpInfo, attr: str) -> float: - if not op_info.has_attr(attr): - return 0 - value = op_info.get_attr(attr) - if not value or value == "N/A": - return 0 - return float(value) - def get_details(self) -> list: """ get details of operator to be optimized diff --git a/profiler/advisor/analyzer/computation/profiling_analyzer.py b/profiler/advisor/analyzer/computation/profiling_analyzer.py index 2021bcd5765d1df7489f202b3453a83924fb28dc..d9d59ca0092fd451bff2a36680ab029eec573213 100644 --- a/profiler/advisor/analyzer/computation/profiling_analyzer.py +++ b/profiler/advisor/analyzer/computation/profiling_analyzer.py @@ -22,6 +22,7 @@ class ProfilingAnalyzer(BaseAnalyzer, ABC): self.checker = OperatorChecker(self.cann_version) self.html_render = HTMLRender() self.result = OptimizeResult() + self.html = None @BaseAnalyzer.check_data((ProfilingDataset.get_key(),)) def optimize(self, **kwargs) -> OptimizeResult: @@ -32,22 +33,28 @@ class ProfilingAnalyzer(BaseAnalyzer, ABC): """ profiling_data = self.get_first_data_by_key(self.dataset_list, ProfilingDataset.get_key()) checker = self.checker + rank_id = kwargs.get("rank_id") + + add_render_list = kwargs.get("add_render_list", True) + if not checker.pre_check(profiling_data): return self.result if checker.check(profiling_data): # add record - record = checker.make_record(profiling_data) - checker.make_render(self.html_render, record) + record = checker.make_record(profiling_data, rank_id) + self.html = checker.make_render(self.html_render, record, add_render_list) self.result.add(record) # add details details = checker.get_details() if details: for i, detail in enumerate(details): + sheet_name = checker.get_name() if rank_id is None else \ + f"rank {rank_id} ".capitalize() + checker.get_name() if i == 0: # the first row is header - self.result.add_detail(checker.get_name(), headers=detail) + self.result.add_detail(sheet_name, headers=detail) else: - self.result.add_detail(checker.get_name(), detail=detail) + self.result.add_detail(sheet_name, detail=detail) # add tune op list tune_op_list = checker.get_tune_op_list() if tune_op_list: diff --git a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py index 326be83b8d49088b1563ccd8c08b68a4aa3001ef..3cbf5f76e1d0e86feab170e1547d47c523cf0280 100644 --- a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py +++ b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py @@ -20,17 +20,19 @@ class FusionOPAnalyzer(BaseAnalyzer): super(FusionOPAnalyzer, self).__init__(collection_path, **kwargs) self.result = OptimizeResult() self.html_render = HTMLRender() - + self.html = None + @BaseAnalyzer.check_data((GraphDataset.get_key(),)) def optimize(self, **kwargs): """ :return: result """ - self._check(self.dataset_list.get("GraphDataset"), self.dataset_list.get("ProfilingDataset")) + self._check(self.dataset_list.get("GraphDataset"), self.dataset_list.get("ProfilingDataset"), + kwargs.get("add_render_list")) return self.result - def _check(self, graph_data: List[GraphDataset], - profiling_data: List[ProfilingDataset] = None) -> None: + def _check(self, graph_data: List[GraphDataset], profiling_data: List[ProfilingDataset] = None, + add_render_list=True) -> None: if len(graph_data) == 0 or graph_data[0].is_empty(): return for _, rule in self.RULES.items(): @@ -40,10 +42,4 @@ class FusionOPAnalyzer(BaseAnalyzer): else: checker.find_fusion_matched_issues_with_times(graph_data, profiling_data) checker.make_record(self.result) - checker.make_render(self.html_render) - - def make_record(self): - pass - - def make_render(self): - pass + self.html = checker.make_render(self.html_render, add_render_list) \ No newline at end of file diff --git a/profiler/advisor/analyzer/overall/overall_summary_analyzer.py b/profiler/advisor/analyzer/overall/overall_summary_analyzer.py index 8e93dbda77d4915e716af856114184324d1d8807..8c328fe07e5a4acec4cc0d931b2f234ca6470835 100644 --- a/profiler/advisor/analyzer/overall/overall_summary_analyzer.py +++ b/profiler/advisor/analyzer/overall/overall_summary_analyzer.py @@ -23,7 +23,7 @@ from profiler.compare_tools.compare_interface.comparison_interface import Compar class OverallSummaryAnalyzer(BaseAnalyzer): - OVERALL_SUMMARY_ANALYZER = "overall_summary_analysis" + OVERALL_SUMMARY_ANALYZER = "overall summary" advice_map = { "Computing Time": "if you want more detailed advice please go to mstt_advisor_*.html", "Uncovered Communication Time": "if you want more detailed advice please go to mstt_advisor_*.html", diff --git a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py index c1eb24b8e1e11ac167a7eb9333867167a57dd524..1a6dd194f80d776ccfee405dfb3a790e1d9e8d51 100644 --- a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py +++ b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py @@ -154,8 +154,9 @@ class TimelineFusionOpsAnalyzer(BaseAnalyzer): timeline_profiling_doc_url=const.TIMELINE_WITH_STACK_DOC_URL ) + sheet_name = "Affinity apis" optimization_item = OptimizeItem( - SupportedScopes.TIMELINE_FUSION_OPS, + sheet_name, desc, [suggestion] ) @@ -163,16 +164,16 @@ class TimelineFusionOpsAnalyzer(BaseAnalyzer): self.result.add(OptimizeRecord(optimization_item)) record_title = ["Affinity API", "Code stacks", "Stack called counts"] - self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, headers=record_title) + self.result.add_detail(sheet_name, headers=record_title) for api_name, stacks_info in format_timeline_result(self.matched_op_stacks).items(): if not stacks_info: detail = [api_name, "null", "null"] - self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, detail=detail) + self.result.add_detail(sheet_name, detail=detail) else: for stack in stacks_info: detail = [api_name, *stack] - self.result.add_detail(SupportedScopes.TIMELINE_FUSION_OPS, detail=detail) + self.result.add_detail(sheet_name, detail=detail) def make_render(self): format_result_for_html = format_timeline_result(dict(self.matched_op_stacks), dump_html=True) diff --git a/profiler/advisor/common/constant.py b/profiler/advisor/common/constant.py index c97cfbfd11e27a3d83ea2f9a25ea7870899bcfd1..53b5a64e26d9eb148d73ae1dd39f7c767975b524 100644 --- a/profiler/advisor/common/constant.py +++ b/profiler/advisor/common/constant.py @@ -33,6 +33,7 @@ TASK_TYPE = "Task Type" CPU_OP = "cpu_op" AI_CORE = "AI_CORE" AI_CPU = "AI_CPU" +MIX_AIC = "MIX_AIC" CALL_STACKS = "Call stack" INPUT_DIMS = "Input Dims" OP_SEP = "-" @@ -48,8 +49,7 @@ NO_STACK_REASON_MAP = { TIMELINE_BACKWARD_NO_STACK_CODE: "Backward broadcast, without call stacks in profiling.", TIMELINE_ACL_TO_NPU_NO_STACK_CODE: "Incoming flow is 'acl_to_npu', without call stacks in profiling." } -TIMELINE_API_DOC_URL = "https://gitee.com/ascend/mstt/blob/master/profiler/advisor/doc/"\ - "Samples%20of%20Fused%20Operator%20API%20Replacement.md" +TIMELINE_API_DOC_URL = "https://gitee.com/ascend/mstt/blob/master/profiler/advisor/doc/Samples%20of%20Fused%20Operator%20API%20Replacement.md" AFFINITY_TRAINING_API = "Affinity training api" TIMELINE_WITH_STACK_DOC_URL = "https://www.hiascend.com/document/detail/zh/canncommercial/" \ "70RC1/modeldevpt/ptmigr/AImpug_0067.html" @@ -156,7 +156,17 @@ COMMUNICATION_JSON = "communication.json" BOTTLENECK = "bottleneck" DATA = "data" - +ADVISOR_ANALYSIS_OUTPUT_DIR = "advisor_analysis_result" +DEFAULT_PROCESSES = 8 +CLUSTER_ANALYSIS_FILE_PATTERN = [r'profiler_info_\d+\.json', "step_trace_time.csv", "communication.json", + "communication_matrix.json"] +ANALYSIS_OUTPUT_PATH = "ANALYSIS_OUTPUT_PATH" +DEFAULT_RANK_FOR_PROFILING_ANALYSIS = 0 +PROFILER_INFO_FILE_PATTERN = r"profiler_info_(\d+)\.json" +DISABLE_STREAMINIG_READER = "DISABLE_STREAMINIG_READER" FRAMEWORK_STACK_BLACK_LIST = ["torch", "torch_npu", "megatron", "deepspeed"] DISABLE_STREAMING_READER = "DISABLE_STREAMING_READER" MAX_FILE_SIZE = 10**10 +MAX_NUM_PROCESSES = 4 +DEFAULT_STEP = "-1" +STEP_RANK_SEP = "_" \ No newline at end of file diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index b4956139c58436f6998ea8ce94a56fc280c038c3..cd750796241f129b41d07552a60a7baa1f882a57 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -15,6 +15,7 @@ import logging import os +import re from profiler.advisor.dataset.dataset import Dataset from profiler.advisor.utils.utils import singleton @@ -79,9 +80,11 @@ class ClusterDataset(Dataset): @singleton class ClusterStepTraceTimeDataset(ClusterDataset): RANK = "rank" + STAGE = "stage" def __init__(self, collection_path: str, data: dict, **kwargs): self._step_dict = defaultdict() + self._stages = [] super().__init__(collection_path, data) def _parse(self): @@ -99,14 +102,31 @@ class ClusterStepTraceTimeDataset(ClusterDataset): step_dict = defaultdict(lambda: [0, 0, 0]) for step_bean in step_data: if step_bean.type == self.RANK: - step_dict[step_bean.index][0] += step_bean.compute - step_dict[step_bean.index][1] += step_bean.communication - step_dict[step_bean.index][2] += step_bean.free + step_rank_record = [] + step = str(step_bean.step).replace(" ", "") or str(const.DEFAULT_STEP) + rank = str(step_bean.index).replace(" ", "") + if step: + step_rank_record.append(step) + if rank: + step_rank_record.append(rank) + + step_rank_index = const.STEP_RANK_SEP.join(step_rank_record) + step_dict[step_rank_index][0] += step_bean.compute + step_dict[step_rank_index][1] += step_bean.communication + step_dict[step_rank_index][2] += step_bean.free + if step_bean.type == self.STAGE: + stage = sorted(list(map(int, re.findall(r'\d+', step_bean.stage)))) + if stage in self._stages: + continue + self._stages.append(stage) return step_dict def get_data(self): return self._step_dict + def get_stages(self): + return sorted(self._stages) + @singleton class ClusterCommunicationDataset(ClusterDataset): @@ -156,7 +176,7 @@ class ClusterCommunicationDataset(ClusterDataset): self.hccl_dict.setdefault(comm_group, defaultdict(lambda: defaultdict(list))) for step, step_dict in group_dict.items(): for op, op_dict in step_dict.items(): - self.compute_bandwidth(op_dict) + self.compute_bandwidth(step.lower().lstrip("step") or str(const.DEFAULT_STEP), op_dict) self.process_hccl_info(comm_group, step, op, op_dict) def process_hccl_info(self, group, step, op, op_dict): @@ -173,7 +193,7 @@ class ClusterCommunicationDataset(ClusterDataset): msg = "[ERROR] Cluster_communication.json has invalid structure." raise ValueError(msg) from e - def compute_bandwidth(self, op_dict: dict): + def compute_bandwidth(self, step, op_dict: dict): for rank_id, rank_dict in op_dict.items(): try: rank = int(rank_id) @@ -182,17 +202,17 @@ class ClusterCommunicationDataset(ClusterDataset): raise ValueError(msg) from e for comm_type, bw_dict in rank_dict.get(self.COMMUNICATION_BANDWIDTH_INFO, {}).items(): if comm_type == self.SDMA: - self.rank_bw_dict[rank][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[rank][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) if comm_type == self.RDMA: - self.rank_bw_dict[rank][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[rank][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) - - for rank, rank_dict in self.rank_bw_dict.items(): - self.rank_bw_dict[rank][self.RDMA_BANDWIDTH] = self.compute_ratio( - self.rank_bw_dict[rank][self.RDMA_SIZE_MB], self.rank_bw_dict[rank][self.RDMA_TIME_MS]) - self.rank_bw_dict[rank][self.SDMA_BANDWIDTH] = self.compute_ratio( - self.rank_bw_dict[rank][self.SDMA_SIZE_MB], self.rank_bw_dict[rank][self.SDMA_TIME_MS]) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + + for step_rank in self.rank_bw_dict.keys(): + self.rank_bw_dict[step_rank][self.RDMA_BANDWIDTH] = self.compute_ratio( + self.rank_bw_dict[step_rank][self.RDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.RDMA_TIME_MS]) + self.rank_bw_dict[step_rank][self.SDMA_BANDWIDTH] = self.compute_ratio( + self.rank_bw_dict[step_rank][self.SDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.SDMA_TIME_MS]) def get_data(self): return self.rank_bw_dict diff --git a/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py b/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py index b108fc77a3f3408d48c79ce6b542f98427d88b0b..8ae0e55f2a5fbc05304fd95809e9b69220dfd3e5 100644 --- a/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py +++ b/profiler/advisor/dataset/cluster/cluster_step_trace_time_bean.py @@ -65,3 +65,6 @@ class ClusterStepTraceTimeBean: msg = "[ERROR] Cluster step trace time.csv has invalid value in column 'Free'." raise ValueError(msg) from e + @property + def stage(self) -> int: + return self._data.get(self.INDEX) diff --git a/profiler/advisor/display/html/priority_background_color.py b/profiler/advisor/display/html/priority_background_color.py new file mode 100644 index 0000000000000000000000000000000000000000..7da61a093099bc6e9fedf40367176d500b04b185 --- /dev/null +++ b/profiler/advisor/display/html/priority_background_color.py @@ -0,0 +1,4 @@ +class PriorityBackgroundColor: + high = "#B5495B" + medium = "#fcaf17" + low = "#65c294" diff --git a/profiler/advisor/display/html/render.py b/profiler/advisor/display/html/render.py index 3984fa8f34f0858a7281c9b51caaa43a170baf86..0c1882f133e5ec8a94c8a1ee80d9c53f7bda989b 100644 --- a/profiler/advisor/display/html/render.py +++ b/profiler/advisor/display/html/render.py @@ -1,7 +1,7 @@ import os import logging from typing import List, Dict -from collections import defaultdict +from collections import defaultdict, OrderedDict from jinja2 import Environment, FileSystemLoader from profiler.advisor.common import constant @@ -14,31 +14,72 @@ logger = logging.getLogger() @singleton class HTMLRender: + SUPPORTED_KEYS = ["main", "overall", "comparison", "computation", "schedule", "communication", "dataloader", + "memory"] + PERFORMANCE_PROBLEM_ANALYSIS = "performance_problem_analysis" + def __init__(self): self.html = "" self.render_list = defaultdict(list) def render_html(self, template_dir: str = "templates", template_name: str = "main.html", template_header=constant.DEFAULT_TEMPLATE_HEADER): - self.html = self.render_template("main", template_dir, template_name, render_list=self.render_list, + + # 确保overall 和 comparison 在 performance problem analysis 之前 + sorted_render_htmls = OrderedDict() + for key in ["overall", "comparison"]: + if key in self.render_list: + sorted_render_htmls[key] = self.render_list.get(key) + for key, html in self.render_list.items(): + if key in sorted_render_htmls: + continue + sorted_render_htmls[key] = html + + self.html = self.render_template("main", template_dir, template_name, render_list=sorted_render_htmls, template_header=template_header) - def render_template(self, key: str, template_dir: str, template_name: str, **kwargs): + def get_rendered_html(self, key: str, template_dir: str, template_name: str, **kwargs): + if key not in self.SUPPORTED_KEYS: + error_msg = f"Error render template key {key}, optionals are {self.SUPPORTED_KEYS}" + logger.error(error_msg) + raise Exception(error_msg) + if not os.path.isabs(template_dir): template_dir = os.path.join(os.path.dirname(__file__), template_dir) env = Environment(loader=FileSystemLoader(template_dir), autoescape=True) template = env.get_template(template_name) + if "priority" not in kwargs: + kwargs["priority"] = "low priority" rendered_html = template.render(**kwargs) - self.render_list[key].append(rendered_html) + return rendered_html + + def render_template(self, key: str, template_dir: str, template_name: str, **kwargs): + rendered_html = self.get_rendered_html(key, template_dir, template_name, **kwargs) + + if not kwargs.get("add_render_list", True): + return rendered_html + + if key in ["main", "overall", "comparison"]: + if key not in self.render_list: + self.render_list[key] = [] + self.render_list[key].append(rendered_html) + else: + if self.PERFORMANCE_PROBLEM_ANALYSIS not in self.render_list: + self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS] = {} + if key not in self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS]: + self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS][key] = [] + self.render_list[self.PERFORMANCE_PROBLEM_ANALYSIS][key].append(rendered_html) + return rendered_html def save_to_file(self, save_path: str): + save_path = os.path.join(Config().work_path, save_path) if not save_path.endswith(".html"): logger.error("Skip save html file because file name must endswith `.html`, " "but got %s.", os.path.basename(save_path)) return safe_write(self.html, save_path) - logger.info("Save suggestion to %s.", os.path.join(Config().work_path, save_path)) + logger.info("Save suggestion to %s.", save_path) diff --git a/profiler/advisor/display/html/templates/affinity_api.html b/profiler/advisor/display/html/templates/affinity_api.html index 4d12c3e37536392d122f85fc6ef3a4fcc123ef77..2b4fa453271a8b8170690951fcea86c625562f3a 100644 --- a/profiler/advisor/display/html/templates/affinity_api.html +++ b/profiler/advisor/display/html/templates/affinity_api.html @@ -3,9 +3,9 @@

Affinity API Issues

The analysis results of following affinity APIs are based on runtime env - cann-{{ cann_version }} + cann-{{ cann_version }} and - torch-{{ torch_version }} + torch-{{ torch_version }}
@@ -13,7 +13,7 @@ Suggestion: These APIs have no code stack. If parameter 'with_stack=False' was set while profiling, please refer to Ascend PyTorch Profiler to set - 'with_stack=True'. Otherwise, ignore following affinity APIs due to backward broadcast lack of stack. + 'with_stack=True'. Otherwise, ignore following affinity APIs due to backward broadcast lack of stack. {% endif %} {% for api_name, stacks in result.items() %} diff --git a/profiler/advisor/display/html/templates/main.html b/profiler/advisor/display/html/templates/main.html index 3727125b419547fc6a9ac9743eab34e1e1b76256..61c52d1db906ed8754f17458eaaffcd5adfc4fe3 100644 --- a/profiler/advisor/display/html/templates/main.html +++ b/profiler/advisor/display/html/templates/main.html @@ -137,10 +137,21 @@

Performance Optimization Suggestions

+ +
+ Optimization Priority: +
+ High +
+ Medium +
+ Low +
+ {% for key, renders in render_list.items() %} - {% if key == 'operator'%} + {% if key != 'performance_problem_analysis' %}
-

computation

+

{{ key }}

{% for render in renders %} {{render|safe}} @@ -148,14 +159,25 @@
{% else %} +
-

{{ key }}

+

performance problem analysis

- {% for render in renders %} - {{render|safe}} - {% endfor %} + + + {% for sub_key, sub_renders in renders.items() %} +
+

{{ sub_key }}

+
+ {% for render in sub_renders %} + {{render|safe}} + {% endfor %} +
+
+ {% endfor %}
+ {% endif %} {% endfor %}