From f4f65eb82f13f73d58dd93a5049711132b202a6b Mon Sep 17 00:00:00 2001 From: wuyuhan Date: Wed, 31 Jul 2024 10:13:17 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=88=86=E6=9E=90=E6=A1=86?= =?UTF-8?q?=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../advisor/analyzer/analyzer_controller.py | 183 ++++++++---------- .../analyzer/cluster/slow_link_analyzer.py | 66 +++++-- .../analyzer/cluster/slow_rank_analyzer.py | 100 +++++----- .../ai_core_freq/ai_core_freq_checker.py | 2 +- .../analyzer/computation/operator_checker.py | 34 ++-- .../pp_stage_computation_analyzer.py | 21 +- .../graph_fusion/graph_fusion_analyzer.py | 8 +- profiler/advisor/analyzer/memory/__init__.py | 0 .../analyzer/memory/memory_analyzer.py | 28 --- .../advisor/analyzer/memory/memory_checker.py | 72 ------- profiler/advisor/common/analyzer_scopes.py | 1 - profiler/advisor/common/constant.py | 5 +- .../dataset/cluster/cluster_dataset.py | 26 ++- .../advisor/dataset/timeline_event_dataset.py | 35 ---- .../display/html/templates/memory.html | 18 -- profiler/advisor/interface/interface.py | 5 +- profiler/advisor/rules/memory.yaml | 7 - profiler/advisor/utils/utils.py | 7 + .../test_analyzer_controller.py | 39 ---- .../test_pp_stage_computation_analyzer.py | 17 +- 20 files changed, 246 insertions(+), 428 deletions(-) delete mode 100644 profiler/advisor/analyzer/memory/__init__.py delete mode 100644 profiler/advisor/analyzer/memory/memory_analyzer.py delete mode 100644 profiler/advisor/analyzer/memory/memory_checker.py delete mode 100644 profiler/advisor/display/html/templates/memory.html delete mode 100644 profiler/advisor/rules/memory.yaml diff --git a/profiler/advisor/analyzer/analyzer_controller.py b/profiler/advisor/analyzer/analyzer_controller.py index f9a75f128..ee723d67f 100644 --- a/profiler/advisor/analyzer/analyzer_controller.py +++ b/profiler/advisor/analyzer/analyzer_controller.py @@ -11,11 +11,12 @@ sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__f from profiler.advisor.analyzer.cluster.slow_rank_analyzer import SlowRankAnalyzer from profiler.advisor.analyzer.cluster.slow_link_analyzer import SlowLinkAnalyzer from profiler.advisor.analyzer.computation.pp_stage_computation_analyzer import PPStageComputationAnalyzer +from profiler.advisor.analyzer.overall.overall_summary_analyzer import OverallSummaryAnalyzer from profiler.advisor.common import constant as const -from profiler.advisor.utils.obs_utils import ObsProfilingFileHandler, enable_mox, MoxUtils +from profiler.advisor.common.analyzer_scopes import SupportedScopes from profiler.advisor.interface.interface import Interface +from profiler.advisor.utils.obs_utils import ObsProfilingFileHandler, enable_mox, MoxUtils from profiler.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor -from profiler.advisor.common.analyzer_scopes import SupportedScopes logger = logging.getLogger() @@ -34,6 +35,56 @@ class AnalyzerController: self.rank_id_map = {} self._is_cluster = False + @staticmethod + def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data, + headers, dimension, get_max=False): + if dimension not in headers: + logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers) + return None, None + + dimension_index = headers.index(dimension) + diff_record = [] + for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data): + target_data = target_row_data[dimension_index] + benchmark_data = benchmark_row_data[dimension_index] + if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)): + continue + diff_record.append(target_data - benchmark_data) + + if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len( + diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD: + return None, None + + value = max(diff_record) if get_max else min(diff_record) + value_index = diff_record.index(value) + step = target_cluster_statistic_data[value_index][headers.index("step")] + target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")] + + return step, target_rank_id + + @staticmethod + def _get_target_profiling_path_for_obs(profiling_path, rank_id): + # 自动下载obs上对应数据至本地,并返回本地路径 + + if not MoxUtils.check_dir_exist(profiling_path): + return None + obs_profiling_handler = ObsProfilingFileHandler(collection_path=profiling_path) + + if not obs_profiling_handler.rank_ids: + logger.error("Can not find file like '%s' from profiling path %s", const.PROFILER_INFO_FILE_PATTERN, + profiling_path) + + if rank_id and rank_id not in obs_profiling_handler.rank_ids: + logger.warning("File 'profiler_info_%s.json' not exists in profiling path %s", rank_id, profiling_path) + return None + + if not rank_id and obs_profiling_handler.rank_ids: + rank_id = obs_profiling_handler.rank_ids[0] + + obs_profiling_handler.download_rank_profiling_files([rank_id]) + target_profiling_path = obs_profiling_handler.get_rank_profiling_local_path(rank_id) + return target_profiling_path + def do_analysis(self): result_list = [] @@ -114,7 +165,6 @@ class AnalyzerController: self.slow_rank_analyzer.optimize(template_key=Interface.OVERALL) self.slow_link_analyzer.optimize(template_key=Interface.OVERALL) else: - from profiler.advisor.analyzer.overall.overall_summary_analyzer import OverallSummaryAnalyzer overall_analyzer = OverallSummaryAnalyzer(profiling_path) overall_analyzer.optimize() @@ -157,26 +207,9 @@ class AnalyzerController: def communication_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None): # 任意单卡的通信分析 - - job_list = [] - # TODO,根据当前的讨论,这块主要是基于带宽的分析,需要能闭环通信重传、算子带宽异常检测 等问题 - return job_list - - def memory_analysis(self, profiling_path, benchmark_profiling_path=None, step=None, benchmark_step=None): - # 任意单卡的内存分析 - kwargs = copy.deepcopy(self.kwargs) job_list = [] - - kwargs["profiling_path"] = profiling_path - kwargs["benchmark_profiling_path"] = benchmark_profiling_path - kwargs["step"] = step - kwargs["benchmark_step"] = benchmark_step - - for dimension in [Interface.MEMORY]: - for scope in Interface.get_scope(dimension): - interface = Interface(**kwargs) - job_list.append((dimension, scope, interface, kwargs)) + # TODO,根据当前的讨论,这块主要是基于带宽的分析,需要能闭环通信重传、算子带宽异常检测 等问题 return job_list def cluster_schedule_analysis(self, profiling_path): @@ -236,9 +269,6 @@ class AnalyzerController: ) kwargs = {"stages_profiling_path": stages_profiling_path, "profiling_path": profiling_path} job_list.append((Interface.COMPUTATION, SupportedScopes.STAGE_COMPUTE, Interface(**kwargs), kwargs)) - # pp_stage_computation_analyzer = PPStageComputationAnalyzer() - # pp_stage_computation_analyzer.optimize(stages_profiling_path) - else: # 不区分stage,对所有卡取Min max进行分析 logger.info("Without pipeline parallel stage, Global analysis steps and ranks is %s", @@ -261,23 +291,10 @@ class AnalyzerController: return job_list - def cluster_memory_analysis(self, profiling_path): - # 目标集群profiling数据内存分析,当前memory识别的两个算子,导致的问题都是大的free,因此选择FREE最慢的卡进行分析 - - job_list = [] - global_step_rank = self.slow_rank_analyzer.get_global_step_rank(SlowRankAnalyzer.FREE) - slow_rank_id = global_step_rank.get("maximum", {}).get("rank_id") or self.default_rank_id - slow_step = global_step_rank.get("maximum", {}).get("step") - analysis_profiling_path = self._get_profiling_path_by_rank(profiling_path, slow_rank_id) - - logger.info("Rank %s and step %s with max free", slow_rank_id, slow_step) - - job_list += self.memory_analysis(analysis_profiling_path, step=slow_step) - return job_list - def _single_profiling_comparison(self, profiling_path, benchmark_profiling_path, step=None, benchmark_step=None): # TODO 基于compare tools 对比计算下发 + kwargs = copy.deepcopy(self.kwargs) return [] def _cluster_profiling_comparison(self, profiling_path, benchmark_profiling_path): @@ -289,34 +306,36 @@ class AnalyzerController: benchmark_slow_link_analyzer = SlowLinkAnalyzer(benchmark_profiling_path) # 计算和下发分析 - headers = self.slow_rank_analyzer.format_datas.get("headers", []) - target_cluster_statistic_data = self.slow_rank_analyzer.format_datas.get("data", []) - benchmark_cluster_statistic_data = benchmark_slow_rank_analyzer.format_datas.get("data", []) job_list += self._cluster_data_comparison(profiling_path, benchmark_profiling_path, - target_cluster_statistic_data, - benchmark_cluster_statistic_data, - headers, - [SlowRankAnalyzer.COMPUTE, SlowRankAnalyzer.FREE], + self.slow_rank_analyzer, + benchmark_slow_rank_analyzer, get_max=True) # 通信分析 - headers = self.slow_link_analyzer.format_datas.get("headers", []) - target_communication_data = self.slow_link_analyzer.format_datas.get("data", []) - benchmark_communication_data = benchmark_slow_link_analyzer.format_datas.get("data", []) job_list += self._cluster_data_comparison(profiling_path, benchmark_profiling_path, - target_communication_data, - benchmark_communication_data, - headers, - [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA], + self.slow_link_analyzer, + benchmark_slow_link_analyzer, get_max=False) return job_list - def _cluster_data_comparison(self, profiling_path, benchmark_profiling_path, target_data, benchmark_data, headers, - comparison_dims, get_max=False): + def _cluster_data_comparison(self, profiling_path, benchmark_profiling_path, target_cluster_analyzer, + benchmark_cluster_analyzer, get_max=False): # #low rank/slow link结果逐行对比获取差值最大的rank和step进行单卡分析 job_list = [] + + if isinstance(target_cluster_analyzer, SlowRankAnalyzer): + comparison_dims = [SlowRankAnalyzer.COMPUTE, SlowRankAnalyzer.FREE] + elif isinstance(target_cluster_analyzer, SlowLinkAnalyzer): + comparison_dims = [SlowLinkAnalyzer.SDMA, SlowLinkAnalyzer.RDMA] + else: + return job_list + + target_data = target_cluster_analyzer.format_datas.get("data", []) + benchmark_data = benchmark_cluster_analyzer.format_datas.get("data", []) + headers = benchmark_cluster_analyzer.format_datas.get("headers", []) + if len(target_data) != len(benchmark_data): logger.warning( "Number of rank ids of Benchmark profiling not equals to target profiling, skip cluster comparison.") @@ -367,8 +386,7 @@ class AnalyzerController: # 数据在obs上 logger.info(f"Obs profiling path: %s", profiling_path) - path_list = [file_name.endswith("ascend_pt") for file_name in - MoxUtils.list_directory(profiling_path)] + path_list = [file_name.endswith("ascend_pt") for file_name in MoxUtils.list_directory(profiling_path)] obs_profiling_handler = ObsProfilingFileHandler(collection_path=profiling_path) if sum(path_list) >= self.CLUSTER_RANK_THRESHOLD: obs_profiling_handler.download_files_by_pattern(const.CLUSTER_ANALYSIS_FILE_PATTERN) @@ -382,12 +400,12 @@ class AnalyzerController: def _get_profiling_path_by_rank(self, profiling_path, rank_id=None): # 根据传入的集群/单卡 profiling数据路径,以及rank id,获取对应rank的profiling数据路径,如果数据在obs则会自动下载至本地。 - if not profiling_path: - return None + if profiling_path is None: + return profiling_path if profiling_path.startswith(self.OBS_PREFIX): # profiling数据在obs上时,自动下载对应rank的数据至本地,并返回本地路径 - return self._get_target_profiling_path_for_obs(profiling_path, rank_id) + return AnalyzerController._get_target_profiling_path_for_obs(profiling_path, rank_id) return self._get_target_profiling_path_for_local(profiling_path, rank_id) @@ -398,28 +416,6 @@ class AnalyzerController: return rank_id_map.get(rank_id) - def _get_target_profiling_path_for_obs(self, profiling_path, rank_id): - # 自动下载obs上对应数据至本地,并返回本地路径 - - if not MoxUtils.check_dir_exist(profiling_path): - return None - obs_profiling_handler = ObsProfilingFileHandler(collection_path=profiling_path) - - if not obs_profiling_handler.rank_ids: - logger.error("Can not find file like '%s' from profiling path %s", const.PROFILER_INFO_FILE_PATTERN, - profiling_path) - - if rank_id and rank_id not in obs_profiling_handler.rank_ids: - logger.warning("File 'profiler_info_%s.json' not exists in profiling path %s", rank_id, profiling_path) - return None - - if not rank_id and obs_profiling_handler.rank_ids: - rank_id = obs_profiling_handler.rank_ids[0] - - obs_profiling_handler.download_rank_profiling_files([rank_id]) - target_profiling_path = obs_profiling_handler.get_rank_profiling_local_path(rank_id) - return target_profiling_path - def _check_profiling_path_valid(self, profiling_path): # 检查profiling数据路径是否存在,支持obs路径和本地路径 @@ -438,29 +434,4 @@ class AnalyzerController: else: return True - @staticmethod - def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data, - headers, dimension, get_max=False): - if dimension not in headers: - logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers) - return None, None - dimension_index = headers.index(dimension) - diff_record = [] - for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data): - target_data = target_row_data[dimension_index] - benchmark_data = benchmark_row_data[dimension_index] - if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)): - continue - diff_record.append(target_data - benchmark_data) - - if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len( - diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD: - return None, None - - value = max(diff_record) if get_max else min(diff_record) - value_index = diff_record.index(value) - step = target_cluster_statistic_data[value_index][headers.index("step")] - target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")] - - return step, target_rank_id diff --git a/profiler/advisor/analyzer/cluster/slow_link_analyzer.py b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py index e57704d4f..a287924b1 100644 --- a/profiler/advisor/analyzer/cluster/slow_link_analyzer.py +++ b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py @@ -15,11 +15,16 @@ from collections import defaultdict from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterCommunicationDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowLinkAnalyzer(BaseAnalyzer): @@ -57,7 +62,7 @@ class SlowLinkAnalyzer(BaseAnalyzer): def optimize(self, **kwargs): if self.rank_bw_dict is None: - print("Slow link analysis failed due to data loading failure. \ + logger.error("Slow link analysis failed due to data loading failure. \ Please check your cluster_analysis_output folder. \ If you are not concerned about this type of data, please ignore this message.") return self.result @@ -76,7 +81,7 @@ class SlowLinkAnalyzer(BaseAnalyzer): if len(data_list) > 0: avg_bw = round(sum(data_list) / len(data_list), 3) else: - print("The slow link (identified bottleneck) cannot provide a bottleneck \ + logger.info("The slow link (identified bottleneck) cannot provide a bottleneck \ because the analysis data is missing bandwidth information.") return self.bottelneck += f'{link_type}: \n' \ @@ -95,8 +100,12 @@ class SlowLinkAnalyzer(BaseAnalyzer): details_dict = {} headers = list({k for rank_bw_value in self.rank_bw_dict.values() for k in rank_bw_value.keys()}) headers.sort() - data_list = [list(map(int, step_rank.split("-"))) + [rank_bw.get(k, 0) for k in headers] for step_rank, rank_bw - in self.rank_bw_dict.items()] + + data_list = [] + for step_rank, rank_bw in self.rank_bw_dict.items(): + step_rank_list = list(map(int, step_rank.split(constant.STEP_RANK_SEP))) + value_list = [rank_bw.get(i, 0) for i in headers] + data_list.append(step_rank_list + value_list) data_list.sort(key=lambda x: (x[0], x[1])) # 按rank_id排序 details_dict["headers"] = ["step", "rank_id"] + headers @@ -115,8 +124,8 @@ class SlowLinkAnalyzer(BaseAnalyzer): ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas["headers"], data) + for i, data in enumerate(self.format_datas.get("data", [])): + self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas.get("headers", []), data) def make_render(self, template_key="cluster"): result_for_html = { @@ -141,22 +150,39 @@ class SlowLinkAnalyzer(BaseAnalyzer): raise RuntimeError(f"Error bindwidth type {bindwidth_type}, optionals are {bindwidth_key_map.keys()}") headers = self.format_datas.get("headers") - bindwidth_index = headers.index(bindwidth_key_map.get(bindwidth_type)) - data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data")] - max_bandwidth, min_bandwidth = max(data_list), min(data_list) - if self.compute_max_gap_ratio(data_list, sum(data_list) / len( - data_list)) < self.RATIO_THRESHOLD: - return global_step_rank + bindwidth_index = safe_index(headers, bindwidth_key_map.get(bindwidth_type)) + + if bindwidth_index is not None: + data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data", [])] + max_bandwidth, min_bandwidth = max(data_list), min(data_list) + + if self.compute_max_gap_ratio(data_list, sum(data_list) / len( + data_list)) < self.RATIO_THRESHOLD: + return global_step_rank - max_bandwidth_index = data_list.index(max_bandwidth) - min_bandwidth_index = data_list.index(min_bandwidth) - max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][headers.index("rank_id")] - max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][headers.index("step")] - min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][headers.index("rank_id")] - min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][headers.index("step")] + max_bandwidth_index = data_list.index(max_bandwidth) + min_bandwidth_index = data_list.index(min_bandwidth) - global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step} - global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step} + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + + if rank_id_index is None: + return global_step_rank + + max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][rank_id_index] + min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][rank_id_index] + + if step_index is None: + max_bandwidth_step, min_bandwidth_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP + else: + max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][step_index] + min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][step_index] + + global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step} + global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step} return global_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py index e04add6ba..7ac04c88e 100644 --- a/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py +++ b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py @@ -13,13 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict -from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterStepTraceTimeDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowRankAnalyzer(BaseAnalyzer): @@ -35,7 +38,7 @@ class SlowRankAnalyzer(BaseAnalyzer): def __init__(self, collection_path, n_processes: int = 1, **kwargs): super().__init__(collection_path, n_processes, **kwargs) key = ClusterStepTraceTimeDataset.get_key() - self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) + self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) self.step_trace_dict = self.step_trace_class.get_data() self.stages = self.step_trace_class.get_stages() self.result = OptimizeResult() @@ -48,16 +51,23 @@ class SlowRankAnalyzer(BaseAnalyzer): def steps(self): return sorted(list(self._steps)) + @staticmethod + def compute_max_gap_ratio(data: list, mean: float): + if mean == 0: + return 0 + else: + return (max(data) - min(data)) / mean + def optimize(self, **kwargs): if self.step_trace_dict is None: - print("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ + logger.error("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ 如不关心这类数据请忽略") return self.result self.process() self.make_record() self.make_render(kwargs.get("template_key")) return self.result - + def process(self): total_time_list = [sum(data_tuple) for rank_id, data_tuple in self.step_trace_dict.items()] if total_time_list: @@ -77,7 +87,6 @@ class SlowRankAnalyzer(BaseAnalyzer): f' because the max difference of {self.BOTTLENECK_LIST[produce_type]} time \n' \ f' has reached {round(max_ratio * mean_total_time / 1000, 3)}ms. \n' - def make_record(self): """ make record for what and how to optimize @@ -89,15 +98,15 @@ class SlowRankAnalyzer(BaseAnalyzer): self.suggestion ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.format_datas["headers"], data) + for i, data in enumerate(self.format_datas.get("data", [])): + self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.format_datas.get("headers", []), data) def format_details(self): details_dict = {} headers = ["step", "rank_id", "compute(us)", "communication(us)", "free(us)"] data_list = [] for key, value in self.step_trace_dict.items(): - step, rank_id = key.split("-") + step, rank_id = key.split(constant.STEP_RANK_SEP) data_list.append([int(step), int(rank_id)] + value) if step and step not in self._steps: self._steps.add(step) @@ -108,9 +117,9 @@ class SlowRankAnalyzer(BaseAnalyzer): def make_render(self, template_key="cluster"): result_for_html = { - "Description" : self.bottelneck, - "suggestion" : self.suggestion, - "details" : [self.format_datas] + "Description": self.bottelneck, + "suggestion": self.suggestion, + "details": [self.format_datas] } self.html_render.render_template(key=template_key, @@ -121,36 +130,17 @@ class SlowRankAnalyzer(BaseAnalyzer): torch_version=self.torch_version, result=result_for_html) - @staticmethod - def compute_max_gap_ratio(data: list, mean: float): - if mean == 0: - return 0 - else: - return (max(data) - min(data)) / mean - - @staticmethod - def get_min_max_rank_id(slow_rank_result, dimension): - slow_rank_data = slow_rank_result.get(SlowRankAnalyzer.SLOW_RANK_ANALYSIS) - headers = slow_rank_data.get("headers") - dimension_index = headers.index(dimension) - data_list = [tuple_list[dimension_index] for tuple_list in slow_rank_data.get("data")] - max_time, min_time = max(data_list), min(data_list) - - if SlowRankAnalyzer.compute_max_gap_ratio(data_list, sum(data_list) / len( - data_list)) < SlowRankAnalyzer.RATIO_THRESHOLD: - return None, None - max_time_index = data_list.index(max_time) - min_time_index = data_list.index(min_time) - max_time_rank_id = slow_rank_data.get("data")[max_time_index][headers.index("rank_id")] - min_time_rank_id = slow_rank_data.get("data")[min_time_index][headers.index("rank_id")] - - return max_time_rank_id, min_time_rank_id - def get_global_step_rank(self, dimension): global_step_rank = {} headers = self.format_datas.get("headers") - dimension_index = headers.index(dimension) + + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return global_step_rank + data_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] max_time, min_time = max(data_list), min(data_list) @@ -159,10 +149,15 @@ class SlowRankAnalyzer(BaseAnalyzer): return global_step_rank max_time_index = data_list.index(max_time) min_time_index = data_list.index(min_time) - max_time_rank_id = self.format_datas.get("data")[max_time_index][headers.index("rank_id")] - max_time_step = self.format_datas.get("data")[max_time_index][headers.index("step")] - min_time_rank_id = self.format_datas.get("data")[min_time_index][headers.index("rank_id")] - min_time_step = self.format_datas.get("data")[min_time_index][headers.index("step")] + + max_time_rank_id = self.format_datas.get("data")[max_time_index][rank_id_index] + min_time_rank_id = self.format_datas.get("data")[min_time_index][rank_id_index] + + if step_index is not None: + max_time_step = self.format_datas.get("data")[max_time_index][step_index] + min_time_step = self.format_datas.get("data")[min_time_index][step_index] + else: + max_time_step, min_time_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP global_step_rank["maximum"] = {"rank_id": max_time_rank_id, "step": max_time_step} global_step_rank["minimum"] = {"rank_id": min_time_rank_id, "step": min_time_step} @@ -170,16 +165,22 @@ class SlowRankAnalyzer(BaseAnalyzer): return global_step_rank def get_stage_step_rank(self, dimension): + stage_step_rank = {} + headers = self.format_datas.get("headers") - dimension_index = headers.index(dimension) - rank_index = headers.index("rank_id") - step_index = headers.index("step") + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return stage_step_rank - step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")] - rank_list = [tuple_list[rank_index] for tuple_list in self.format_datas.get("data")] + rank_list = [tuple_list[rank_id_index] for tuple_list in self.format_datas.get("data")] cost_time_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] - stage_step_rank = {} + if step_index is not None: + step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")] + else: + step_list = [constant.DEFAULT_STEP] * len(rank_list) for index, stage in enumerate(self.stages): tmp_step_list, tmp_rank_list, tmp_time_list = [], [], [] @@ -206,3 +207,6 @@ class SlowRankAnalyzer(BaseAnalyzer): "step": tmp_step_list[min_time_index]} return stage_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py index 87239553c..e4b36914b 100644 --- a/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py +++ b/profiler/advisor/analyzer/computation/ai_core_freq/ai_core_freq_checker.py @@ -93,7 +93,7 @@ class AICoreFreqChecker: def make_render(self, html_render, add_render_list=True): if not self.ai_core_freq_issues: - return + return None if self.SHOW_TOPK_OPS: self.desc += f" Only show {self.SHOW_TOPK_OPS} operators here, see latest mstt_advisor.xlsx for details." diff --git a/profiler/advisor/analyzer/computation/operator_checker.py b/profiler/advisor/analyzer/computation/operator_checker.py index 0df3a143f..908659643 100644 --- a/profiler/advisor/analyzer/computation/operator_checker.py +++ b/profiler/advisor/analyzer/computation/operator_checker.py @@ -40,6 +40,23 @@ class OperatorChecker(VersionControl): self.cann_version = cann_version self._op_list: List[OpInfo] = [] + @classmethod + def get_name(cls): + """ + get name of checker + :return: checker name + """ + return cls._PROBLEM + + @staticmethod + def get_ratio(op_info: OpInfo, attr: str) -> float: + if not op_info.has_attr(attr): + return 0 + value = op_info.get_attr(attr) + if not value or value == "N/A": + return 0 + return float(value) + def check(self, profiling_data: ProfilingDataset) -> bool: """ check if any operator need optimize @@ -243,14 +260,6 @@ class OperatorChecker(VersionControl): """Get node views.""" return [] - @classmethod - def get_name(cls): - """ - get name of checker - :return: checker name - """ - return cls._PROBLEM - def get_incomes(self) -> float: """get incomes""" incomes = 0.0 @@ -274,15 +283,6 @@ class OperatorChecker(VersionControl): return False return True - @staticmethod - def get_ratio(op_info: OpInfo, attr: str) -> float: - if not op_info.has_attr(attr): - return 0 - value = op_info.get_attr(attr) - if not value or value == "N/A": - return 0 - return float(value) - def get_details(self) -> list: """ get details of operator to be optimized diff --git a/profiler/advisor/analyzer/computation/pp_stage_computation_analyzer.py b/profiler/advisor/analyzer/computation/pp_stage_computation_analyzer.py index 76b66053a..33021e2ca 100644 --- a/profiler/advisor/analyzer/computation/pp_stage_computation_analyzer.py +++ b/profiler/advisor/analyzer/computation/pp_stage_computation_analyzer.py @@ -21,6 +21,7 @@ class PPStageComputationAnalyzer(BaseAnalyzer): AICoreFreqAnalyzer] def __init__(self, collection_path, **kwargs): + super().__init__(collection_path, **kwargs) self.collection_path = collection_path self._stages_rendered_html = Manager().list() self._multiprocess_result = Manager().dict() @@ -28,6 +29,12 @@ class PPStageComputationAnalyzer(BaseAnalyzer): self.html_render = None self.result = None + @staticmethod + def _get_valid_sheet_name(sheet_name, prefix): + if not sheet_name.lower().startswith(prefix.lower()): + sheet_name = f"{prefix} {sheet_name}" + return sheet_name + def optimize(self, stages_profiling_path, **kwargs): pp_stage_processes = min(int(os.getenv("PP_STAGE_ANALYSIS_PROCESSES", 0)), len(stages_profiling_path), @@ -54,8 +61,8 @@ class PPStageComputationAnalyzer(BaseAnalyzer): priority_background_color=PriorityBackgroundColor.high) def _optimize(self, profiling_path, **kwargs): - stage_html_record = dict(stage=kwargs.get("stage"), rank_id=kwargs.get("rank_id"), step=kwargs.get("step"), - html_list=[]) + html_list = [] + stage_html_record = dict(stage=kwargs.get("stage"), rank_id=kwargs.get("rank_id"), step=kwargs.get("step")) kwargs["add_render_list"] = False for analyzer_cls in self.STAGE_ANALYZER_LIST: @@ -64,7 +71,9 @@ class PPStageComputationAnalyzer(BaseAnalyzer): if hasattr(result, "data") and result.data: self.result = result if hasattr(analyzer, "html") and analyzer.html: - stage_html_record["html_list"].append(analyzer.html) + html_list.append(analyzer.html) + stage_html_record["html_list"] = html_list + self._stages_rendered_html.append(stage_html_record) self._multiprocess_result[f"rank {kwargs.get('rank_id')}".capitalize()] = result.data @@ -92,9 +101,3 @@ class PPStageComputationAnalyzer(BaseAnalyzer): for row in data: self.result.add_detail(sheet_name, detail=row) - - @staticmethod - def _get_valid_sheet_name(sheet_name, prefix): - if not sheet_name.lower().startswith(prefix.lower()): - sheet_name = f"{prefix} {sheet_name}" - return sheet_name diff --git a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py index b78ac3d79..3cbf5f76e 100644 --- a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py +++ b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py @@ -42,10 +42,4 @@ class FusionOPAnalyzer(BaseAnalyzer): else: checker.find_fusion_matched_issues_with_times(graph_data, profiling_data) checker.make_record(self.result) - self.html = checker.make_render(self.html_render, add_render_list) - - def make_record(self): - pass - - def make_render(self): - pass + self.html = checker.make_render(self.html_render, add_render_list) \ No newline at end of file diff --git a/profiler/advisor/analyzer/memory/__init__.py b/profiler/advisor/analyzer/memory/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/profiler/advisor/analyzer/memory/memory_analyzer.py b/profiler/advisor/analyzer/memory/memory_analyzer.py deleted file mode 100644 index 7738cc23d..000000000 --- a/profiler/advisor/analyzer/memory/memory_analyzer.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging - -from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer -from profiler.advisor.result.result import OptimizeResult -from profiler.advisor.analyzer.memory.memory_checker import MemoryOpsChecker -from profiler.advisor.display.html.render import HTMLRender -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset - -logger = logging.getLogger() - - -class MemoryAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] - - def __init__(self, collection_path, n_processes: int = 1, **kwargs) -> None: - super().__init__(collection_path, n_processes, **kwargs) - key = TimelineEventDataset.get_key() - self.dataset = self.get_first_data_by_key(self.dataset_list, key) - self.result = OptimizeResult() - self.html_render = HTMLRender() - - @BaseAnalyzer.check_data((TimelineEventDataset.get_key(),)) - def optimize(self, **kwargs): - memory_checker = MemoryOpsChecker() - memory_checker.check_memory_ops(self.dataset) - memory_checker.make_record(self.result) - memory_checker.make_render(self.html_render) - return self.result diff --git a/profiler/advisor/analyzer/memory/memory_checker.py b/profiler/advisor/analyzer/memory/memory_checker.py deleted file mode 100644 index 4042dae9a..000000000 --- a/profiler/advisor/analyzer/memory/memory_checker.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -import re -import logging -import yaml - -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset, MemCollector -from profiler.advisor.result.result import OptimizeResult -from profiler.advisor.result.item import OptimizeItem, OptimizeRecord -from profiler.cluster_analyse.common_func.file_manager import FileManager - -logger = logging.getLogger() - - -class MemoryOpsChecker: - - def __init__(self): - - self.memory_issues = False - self.optimization_item = [] - self.desc = "" - self.suggestions = [] - self.memory_ops_duration_threshold = None - - def check_memory_ops(self, event_dataset: TimelineEventDataset): - """ - :Param event_dataset: dataset of timeline event - """ - if not hasattr(event_dataset, "memory_ops") or not getattr(event_dataset, "memory_ops") or \ - not event_dataset.memory_ops.mem_op_info: - logger.debug("Skip slow memory ops checker, because no memory ops: %s", MemCollector.MEMORY_OP_NAME) - return - - rule = event_dataset.memory_ops.rule - max_dur = rule.get("max_total_duration") - raw_problem = rule.get("problem") - - for memory_op_name, memory_op_info in event_dataset.memory_ops.mem_op_info.items(): - # convert to milli-sec - op_dur = memory_op_info.get("total_dur") / 1000 - op_count = memory_op_info.get("count") - if op_dur < max_dur: - continue - - self.memory_issues = True - self.desc += raw_problem.format(memory_op_num=op_count, memory_op_name=memory_op_name, - memory_op_dur=op_dur) + " " - for solution in rule.get("solutions", []): - if memory_op_name not in solution: - continue - suggestion = solution.get(memory_op_name, {}).get("desc") - - self.suggestions.append(f"{suggestion} for optimize memory operator {memory_op_name}") - - def make_record(self, result: OptimizeResult): - """ - make record for what and how to optimize - """ - if not self.memory_issues: - return - - self.optimization_item.append(OptimizeItem("Memory", self.desc, self.suggestions)) - for optimization in self.optimization_item: - result.add(OptimizeRecord(optimization)) - - def make_render(self, html_render): - if not self.memory_issues: - return - html_render.render_template(key="memory", - template_dir="templates", - template_name="memory.html", - desc=self.desc, - suggestions=self.suggestions) diff --git a/profiler/advisor/common/analyzer_scopes.py b/profiler/advisor/common/analyzer_scopes.py index e6726e082..62d7f6fe4 100644 --- a/profiler/advisor/common/analyzer_scopes.py +++ b/profiler/advisor/common/analyzer_scopes.py @@ -16,5 +16,4 @@ class SupportedScopes: SYNCBN = "syncbn" SYNCHRONIZE_STREAM = "synchronize_stream" FREQ_ANALYSIS = "freq_analysis" - MEMORY = "memory" STAGE_COMPUTE = "stage_compute" diff --git a/profiler/advisor/common/constant.py b/profiler/advisor/common/constant.py index e44619ee2..486a1fc6c 100644 --- a/profiler/advisor/common/constant.py +++ b/profiler/advisor/common/constant.py @@ -33,6 +33,7 @@ TASK_TYPE = "Task Type" CPU_OP = "cpu_op" AI_CORE = "AI_CORE" AI_CPU = "AI_CPU" +MIX_AIC = "MIX_AIC" CALL_STACKS = "Call stack" INPUT_DIMS = "Input Dims" OP_SEP = "-" @@ -150,4 +151,6 @@ DISABLE_STREAMINIG_READER = "DISABLE_STREAMINIG_READER" FRAMEWORK_STACK_BLACK_LIST = ["torch", "torch_npu", "megatron", "deepspeed"] DISABLE_STREAMING_READER = "DISABLE_STREAMING_READER" MAX_FILE_SIZE = 10**10 -MAX_NUM_PROCESSES = 4 \ No newline at end of file +MAX_NUM_PROCESSES = 4 +DEFAULT_STEP = "-1" +STEP_RANK_SEP = "_" \ No newline at end of file diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index 1f299afc5..9def21c1a 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -87,7 +87,15 @@ class ClusterStepTraceTimeDataset(ClusterDataset): step_dict = defaultdict(lambda: [0, 0, 0]) for step_bean in step_data: if step_bean.type == self.RANK: - step_rank_index = f"{step_bean.step}-{step_bean.index}" + step_rank_record = [] + step = str(step_bean.step).replace(" ", "") or str(const.DEFAULT_STEP) + rank = str(step_bean.index).replace(" ", "") + if step: + step_rank_record.append(step) + if rank: + step_rank_record.append(rank) + + step_rank_index = const.STEP_RANK_SEP.join(step_rank_record) step_dict[step_rank_index][0] += step_bean.compute step_dict[step_rank_index][1] += step_bean.communication step_dict[step_rank_index][2] += step_bean.free @@ -146,10 +154,10 @@ class ClusterCommunicationDataset(ClusterDataset): return True def process(self, communication_json: dict): - for comm_group, group_dict in communication_json.items(): + for group_dict in communication_json.values(): for step, step_dict in group_dict.items(): - for op, op_dict in step_dict.items(): - self.compute_bandwidth(step.lower().lstrip("step"), op_dict) + for op_dict in step_dict.values(): + self.compute_bandwidth(step.lower().lstrip("step") or str(const.DEFAULT_STEP), op_dict) def compute_bandwidth(self, step, op_dict: dict): for rank_id, rank_dict in op_dict.items(): @@ -160,13 +168,13 @@ class ClusterCommunicationDataset(ClusterDataset): raise ValueError(msg) from e for comm_type, bw_dict in rank_dict.get(self.COMMUNICATION_BANDWIDTH_INFO, {}).items(): if comm_type == self.SDMA: - self.rank_bw_dict[f"{step}-{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[f"{step}-{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) if comm_type == self.RDMA: - self.rank_bw_dict[f"{step}-{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[f"{step}-{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) - for step_rank, rank_dict in self.rank_bw_dict.items(): + for step_rank in self.rank_bw_dict.keys(): self.rank_bw_dict[step_rank][self.RDMA_BANDWIDTH] = self.compute_ratio( self.rank_bw_dict[step_rank][self.RDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.RDMA_TIME_MS]) self.rank_bw_dict[step_rank][self.SDMA_BANDWIDTH] = self.compute_ratio( diff --git a/profiler/advisor/dataset/timeline_event_dataset.py b/profiler/advisor/dataset/timeline_event_dataset.py index 71419dbbd..1504e65f5 100644 --- a/profiler/advisor/dataset/timeline_event_dataset.py +++ b/profiler/advisor/dataset/timeline_event_dataset.py @@ -76,31 +76,6 @@ class SynchronizeStreamCollector: self._slow_synchronize_stream = [] -class MemCollector: - MEMORY_OP_NAME = ["AscendCL@aclMallocMemInner", "AscendCL@aclrtFreePhysical"] - - def __init__(self): - self.mem_op_info = {} - self.rule = self._load_rule() - - @staticmethod - def _load_rule(): - memory_rule_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "rules", - "memory.yaml") - - memory_rule = FileManager.read_yaml_file(memory_rule_path) - return memory_rule - - def append_mem_op(self, event): - if event.name not in self.MEMORY_OP_NAME: - return - - if event.name not in self.mem_op_info: - self.mem_op_info[event.name] = dict(count=0, total_dur=0) - self.mem_op_info[event.name]["count"] += 1 - self.mem_op_info[event.name]["total_dur"] += float(event.dur) - - @singleton class TimelineEventDataset: @@ -108,7 +83,6 @@ class TimelineEventDataset: self._ops_with_task_type = {} self._ops_with_stack = {} self._ops_compile = OpCompileCollector() - self._memory_ops = MemCollector() self._torch_to_npu = {} self._acl_to_npu = set() self._aten: List[Any] = [] @@ -145,10 +119,6 @@ class TimelineEventDataset: def ops_compile(self): return self._ops_compile - @property - def memory_ops(self): - return self._memory_ops - @property def torch_to_npu(self): return self._torch_to_npu @@ -283,9 +253,6 @@ class TimelineEventDataset: "name": event.name, "ts": event.ts, "dur": event.dur })) - def _add_memory_ops(self, event: TimelineEvent): - self._memory_ops.append_mem_op(event) - def _add_specific_operator(self, event): # for analysis of operator aclOpCompile, enable jit_compILE=False self._add_op_compile(event) @@ -293,8 +260,6 @@ class TimelineEventDataset: self._add_dataloader(event) # for analysis of syncBatchNorm operator, prompt users to replace source code of torch_npu's syncbn self._add_sync_batchnorm(event) - # for analysis of memory - self._add_memory_ops(event) def _add_event(self, index, event): event["dataset_index"] = index diff --git a/profiler/advisor/display/html/templates/memory.html b/profiler/advisor/display/html/templates/memory.html deleted file mode 100644 index c35070109..000000000 --- a/profiler/advisor/display/html/templates/memory.html +++ /dev/null @@ -1,18 +0,0 @@ -
-

Memory Operator Issues

-
- {{ desc }} - - - - - - {% for suggestion in suggestions %} - - - - {% endfor %} -
Suggestions
{{ loop.index }}. {{ suggestion|safe }}
- -
-
diff --git a/profiler/advisor/interface/interface.py b/profiler/advisor/interface/interface.py index 7286b7997..39c64ca90 100644 --- a/profiler/advisor/interface/interface.py +++ b/profiler/advisor/interface/interface.py @@ -21,7 +21,6 @@ from profiler.advisor.analyzer.schedule.syncbn.syncbn_analyzer import SyncBNAnal from profiler.advisor.analyzer.schedule.synchronize_stream.synchronize_stream_analyzer import SynchronizeStreamAnalyzer from profiler.advisor.analyzer.dataloader.dataloader_analyzer import DataloaderAnalyzer from profiler.advisor.analyzer.computation.ai_core_freq.ai_core_freq_analyzer import AICoreFreqAnalyzer -from profiler.advisor.analyzer.memory.memory_analyzer import MemoryAnalyzer from profiler.advisor.analyzer.computation.pp_stage_computation_analyzer import PPStageComputationAnalyzer class Interface: @@ -31,7 +30,6 @@ class Interface: OVERALL = "overall" DATALOADER = "dataloader" CLUSTER = "cluster" - MEMORY = "memory" supported_analyzer = { SCHEDULE: OrderedDict({ @@ -55,8 +53,7 @@ class Interface: CLUSTER: OrderedDict({ SupportedScopes.SLOW_RANK: SlowRankAnalyzer, SupportedScopes.SLOW_LINK: SlowLinkAnalyzer - }), - MEMORY: OrderedDict({SupportedScopes.MEMORY: MemoryAnalyzer}) + }) } all_dimension = list(supported_analyzer.keys()) diff --git a/profiler/advisor/rules/memory.yaml b/profiler/advisor/rules/memory.yaml deleted file mode 100644 index 8500c0ad6..000000000 --- a/profiler/advisor/rules/memory.yaml +++ /dev/null @@ -1,7 +0,0 @@ -problem: "Found {memory_op_num} {memory_op_name}, cost {memory_op_dur} milliseconds, which will lead to large amount of free time." -max_total_duration: 100 #ms -solutions: - - AscendCL@aclMallocMemInner: - desc: "Please set env by command 'export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True' and then start your training job" - - AscendCL@aclrtFreePhysical: - desc: "Please reduce your batch size for training job" \ No newline at end of file diff --git a/profiler/advisor/utils/utils.py b/profiler/advisor/utils/utils.py index 6d943925f..d87b7ef24 100644 --- a/profiler/advisor/utils/utils.py +++ b/profiler/advisor/utils/utils.py @@ -613,3 +613,10 @@ def convert_to_float(num): logger.error(f"Can not convert %ss to float", num) pass return 0 + + +def safe_index(array, value, return_index_if_error=None): + if value in array: + return array.index(value) + + return return_index_if_error \ No newline at end of file diff --git a/profiler/test/ut/advisor/advisor_backend/analyzer_controller/test_analyzer_controller.py b/profiler/test/ut/advisor/advisor_backend/analyzer_controller/test_analyzer_controller.py index 261a0f5db..5033c7d5e 100644 --- a/profiler/test/ut/advisor/advisor_backend/analyzer_controller/test_analyzer_controller.py +++ b/profiler/test/ut/advisor/advisor_backend/analyzer_controller/test_analyzer_controller.py @@ -38,8 +38,6 @@ class TestAnalyzerController(unittest.TestCase): lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.overall", lambda *args: mock_analysis_job_list) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.memory_analysis", - lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.computation_analysis", lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.schedule_analysis", @@ -57,8 +55,6 @@ class TestAnalyzerController(unittest.TestCase): lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.cluster_computation_analysis", lambda *args: mock_analysis_job_list) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.cluster_memory_analysis", - lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController._cluster_profiling_comparison", lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.overall", @@ -74,8 +70,6 @@ class TestAnalyzerController(unittest.TestCase): lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.cluster_computation_analysis", lambda *args: mock_analysis_job_list) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.cluster_memory_analysis", - lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController._cluster_profiling_comparison", lambda *args: mock_analysis_job_list) @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController.overall", @@ -103,15 +97,6 @@ class TestAnalyzerController(unittest.TestCase): job_num = len(Interface.get_scope(Interface.COMPUTATION)) self.assertTrue(isinstance(job_list, list)) and self.assertEqual(len(job_list), job_num) - def test_memory_analysis(self): - analyzer_controller = AnalyzerController(Interface.all_dimension) - job_list = analyzer_controller.memory_analysis(mock_profiling_path, - mock_benchmark_profiling_path, - step, - benchmark_step) - job_num = len(Interface.get_scope(Interface.MEMORY)) - self.assertTrue(isinstance(job_list, list)) and self.assertEqual(len(job_list), job_num) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController._get_profiling_path_by_rank", lambda *args: mock_profiling_path) def test_cluster_schedule_analysis(self): @@ -147,27 +132,6 @@ class TestAnalyzerController(unittest.TestCase): job_num = len(Interface.get_scope(Interface.COMPUTATION)) self.assertTrue(isinstance(job_list, list)) and self.assertEqual(len(job_list), job_num) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController._get_profiling_path_by_rank", - lambda *args: mock_profiling_path) - def test_cluster_memory_analysis(self): - analyzer_controller = AnalyzerController(Interface.all_dimension) - analyzer_controller.slow_rank_analyzer = self.mock_slow_rank_analyzer - job_list = analyzer_controller.cluster_memory_analysis(mock_profiling_path) - job_num = len(Interface.get_scope(Interface.MEMORY)) - self.assertTrue(isinstance(job_list, list)) and self.assertEqual(len(job_list), job_num) - - @mock.patch( - "profiler.advisor.analyzer.analyzer_controller.AnalyzerController._get_step_rank_for_cluster_statistic_diff", - lambda *args: step, 0) - @mock.patch("profiler.advisor.analyzer.analyzer_controller.AnalyzerController._get_profiling_path_by_rank", - lambda *args: mock_profiling_path) - def test_cluster_data_comparison(self): - analyzer_controller = AnalyzerController(Interface.all_dimension) - job_list = analyzer_controller._cluster_data_comparison(mock_profiling_path, mock_benchmark_profiling_path, [], - [], [], []) - self.assertTrue(isinstance(job_list, list)) and self.assertFalse(job_list) - - def test_get_target_profiling_path_for_local(self): analyzer_controller = AnalyzerController(Interface.all_dimension) result = analyzer_controller._get_target_profiling_path_for_local(mock_profiling_path, 0) @@ -190,11 +154,8 @@ if __name__ == '__main__': tester.test_cluster_analysis() tester.test_schedule_analysis() tester.test_computation_analysis() - tester.test_memory_analysis() tester.test_cluster_schedule_analysis() tester.test_cluster_communication_analysis() tester.test_cluster_computation_analysis_for_stage() tester.test_cluster_computation_analysis_for_global() - tester.test_cluster_memory_analysis() - tester.test_cluster_data_comparison() tester.test_get_target_profiling_path_for_local() diff --git a/profiler/test/ut/advisor/advisor_backend/compute_advice/test_pp_stage_computation_analyzer.py b/profiler/test/ut/advisor/advisor_backend/compute_advice/test_pp_stage_computation_analyzer.py index cd9439c48..a8be1bf19 100644 --- a/profiler/test/ut/advisor/advisor_backend/compute_advice/test_pp_stage_computation_analyzer.py +++ b/profiler/test/ut/advisor/advisor_backend/compute_advice/test_pp_stage_computation_analyzer.py @@ -1,10 +1,13 @@ import unittest import copy +import os from profiler.advisor.analyzer.computation.pp_stage_computation_analyzer import PPStageComputationAnalyzer from profiler.test.ut.advisor.advisor_backend.tools.tool import recover_env +mock_profiling_path = os.path.realpath(__file__) + class TestPPStageComputationAnalyzer(unittest.TestCase): @classmethod def tearDownClass(cls) -> None: @@ -31,18 +34,20 @@ class TestPPStageComputationAnalyzer(unittest.TestCase): self.mocked_multiprocess_data[f"rank {i}"] = copy.deepcopy(rank_result_data) def test_merge_multiprocess_result(self): - pp_stage_computation_analyzer = PPStageComputationAnalyzer() - result = pp_stage_computation_analyzer._merge_multiprocess_result() + pp_stage_computation_analyzer = PPStageComputationAnalyzer(mock_profiling_path) + pp_stage_computation_analyzer._merge_multiprocess_result() + result = pp_stage_computation_analyzer.result self.assertFalse(result.data) pp_stage_computation_analyzer._multiprocess_result = copy.deepcopy(self.mocked_multiprocess_data) - result = dict(pp_stage_computation_analyzer._merge_multiprocess_result().data) + pp_stage_computation_analyzer._merge_multiprocess_result() + data = dict(pp_stage_computation_analyzer.result.data) - problems = result.get("problems", {}).get("data", []) + problems = data.get("problems", {}).get("data", []) self.assertEqual(len(problems), self.rank_num) for i in range(self.rank_num): - self.assertTrue(f"rank {i} ai cpu issues" in result) - self.assertTrue(f"rank {i} frequency issues" in result) + self.assertTrue(f"rank {i} ai cpu issues" in data) + self.assertTrue(f"rank {i} frequency issues" in data) if __name__ == '__main__': -- Gitee