From 913e18fccbfffece2a3c5dc8e5619d15f881b371 Mon Sep 17 00:00:00 2001 From: wuyuhan Date: Mon, 12 Aug 2024 19:42:01 +0800 Subject: [PATCH] =?UTF-8?q?advisor=E9=9B=86=E7=BE=A4=E5=88=86=E6=9E=90?= =?UTF-8?q?=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../advisor/analyzer/analyzer_controller.py | 145 ++----- profiler/advisor/analyzer/base_analyzer.py | 42 +- .../analyzer/cluster/slow_link_analyzer.py | 66 ++- .../analyzer/cluster/slow_rank_analyzer.py | 103 ++--- .../dataloader/dataloader_analyzer.py | 14 +- .../analyzer/dataloader/dataloader_checker.py | 10 +- .../graph_fusion/graph_fusion_analyzer.py | 5 +- .../analyzer/memory/memory_analyzer.py | 18 +- .../advisor/analyzer/memory/memory_checker.py | 17 +- .../dispatch/timeline_op_dispatch_analyzer.py | 60 +-- .../fusion_ops/fusion_ops_analyzer.py | 13 +- .../fusion_ops/timeline_api_stack_checker.py | 14 +- .../schedule/syncbn/syncbn_analyzer.py | 16 +- .../schedule/syncbn/syncbn_checker.py | 11 +- .../synchronize_stream_analyzer.py | 15 +- .../synchronize_stream_checker.py | 11 +- profiler/advisor/common/constant.py | 7 +- .../dataset/cluster/cluster_dataset.py | 56 ++- .../advisor/dataset/timeline_event_dataset.py | 386 ++++++------------ .../dataset/timeline_op_collector/__init__.py | 0 .../timeline_op_collector.py | 367 +++++++++++++++++ .../display/html/templates/affinity_api.html | 2 +- .../html/templates/ai_core_frequency.html | 2 +- .../html/templates/operator_ai_cpu.html | 2 +- .../html/templates/operator_block_dim.html | 2 +- .../html/templates/operator_dispatch.html | 2 +- .../templates/operator_dynamic_shape.html | 2 +- .../html/templates/operator_no_bound.html | 2 +- profiler/advisor/utils/obs_utils.py | 129 ------ profiler/advisor/utils/utils.py | 12 +- .../timeline_advice/test_memory_op_checker.py | 62 +++ .../test_timeline_op_collector.py | 144 +++++++ 32 files changed, 1055 insertions(+), 682 deletions(-) create mode 100644 profiler/advisor/dataset/timeline_op_collector/__init__.py create mode 100644 profiler/advisor/dataset/timeline_op_collector/timeline_op_collector.py delete mode 100644 profiler/advisor/utils/obs_utils.py create mode 100644 profiler/test/ut/advisor/advisor_backend/timeline_advice/test_memory_op_checker.py create mode 100644 profiler/test/ut/advisor/advisor_backend/timeline_advice/test_timeline_op_collector.py diff --git a/profiler/advisor/analyzer/analyzer_controller.py b/profiler/advisor/analyzer/analyzer_controller.py index f9a75f128..440f5f15e 100644 --- a/profiler/advisor/analyzer/analyzer_controller.py +++ b/profiler/advisor/analyzer/analyzer_controller.py @@ -12,7 +12,6 @@ from profiler.advisor.analyzer.cluster.slow_rank_analyzer import SlowRankAnalyze from profiler.advisor.analyzer.cluster.slow_link_analyzer import SlowLinkAnalyzer from profiler.advisor.analyzer.computation.pp_stage_computation_analyzer import PPStageComputationAnalyzer from profiler.advisor.common import constant as const -from profiler.advisor.utils.obs_utils import ObsProfilingFileHandler, enable_mox, MoxUtils from profiler.advisor.interface.interface import Interface from profiler.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from profiler.advisor.common.analyzer_scopes import SupportedScopes @@ -34,6 +33,33 @@ class AnalyzerController: self.rank_id_map = {} self._is_cluster = False + @staticmethod + def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data, + headers, dimension, get_max=False): + if dimension not in headers: + logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers) + return None, None + + dimension_index = headers.index(dimension) + diff_record = [] + for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data): + target_data = target_row_data[dimension_index] + benchmark_data = benchmark_row_data[dimension_index] + if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)): + continue + diff_record.append(target_data - benchmark_data) + + if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len( + diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD: + return None, None + + value = max(diff_record) if get_max else min(diff_record) + value_index = diff_record.index(value) + step = target_cluster_statistic_data[value_index][headers.index("step")] + target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")] + + return step, target_rank_id + def do_analysis(self): result_list = [] @@ -236,9 +262,6 @@ class AnalyzerController: ) kwargs = {"stages_profiling_path": stages_profiling_path, "profiling_path": profiling_path} job_list.append((Interface.COMPUTATION, SupportedScopes.STAGE_COMPUTE, Interface(**kwargs), kwargs)) - # pp_stage_computation_analyzer = PPStageComputationAnalyzer() - # pp_stage_computation_analyzer.optimize(stages_profiling_path) - else: # 不区分stage,对所有卡取Min max进行分析 logger.info("Without pipeline parallel stage, Global analysis steps and ranks is %s", @@ -345,50 +368,27 @@ class AnalyzerController: return job_list def _is_cluster_profiling(self, profiling_path): - # 判断是否为集群场景,obs场景没办法调用PytorchDataPreprocessor判断是否是集群场景,只能根据ascend_pt后缀来判断 - - if not profiling_path.startswith(self.OBS_PREFIX): - # 数据在本地盘 - path_list = [os.path.join(profiling_path, dir_name) for dir_name in os.listdir(profiling_path)] - ascend_pt_dirs = [path for path in path_list if os.path.isdir(path) and path.endswith("ascend_pt")] - data_processor = PytorchDataPreprocessor(ascend_pt_dirs) - - self.cluster_local_data_map[profiling_path] = data_processor.get_data_map() - - if not self.cluster_local_data_map or not self.cluster_local_data_map.get(profiling_path): - return False - - self.default_rank_id = list(self.cluster_local_data_map[profiling_path].keys())[0] - - self.slow_rank_analyzer = SlowRankAnalyzer(profiling_path) - self.slow_link_analyzer = SlowLinkAnalyzer(profiling_path) - # 仅八卡及以上认为是集群场景 - return len(self.cluster_local_data_map[profiling_path]) >= self.CLUSTER_RANK_THRESHOLD - - # 数据在obs上 - logger.info(f"Obs profiling path: %s", profiling_path) - path_list = [file_name.endswith("ascend_pt") for file_name in - MoxUtils.list_directory(profiling_path)] - obs_profiling_handler = ObsProfilingFileHandler(collection_path=profiling_path) - if sum(path_list) >= self.CLUSTER_RANK_THRESHOLD: - obs_profiling_handler.download_files_by_pattern(const.CLUSTER_ANALYSIS_FILE_PATTERN) - local_profiling_path = obs_profiling_handler.get_cluster_analysis_local_path() - self.slow_rank_analyzer = SlowRankAnalyzer(local_profiling_path) - self.slow_link_analyzer = SlowLinkAnalyzer(local_profiling_path) - return True - else: + path_list = [os.path.join(profiling_path, dir_name) for dir_name in os.listdir(profiling_path)] + ascend_pt_dirs = [path for path in path_list if os.path.isdir(path) and path.endswith("ascend_pt")] + data_processor = PytorchDataPreprocessor(ascend_pt_dirs) + + self.cluster_local_data_map[profiling_path] = data_processor.get_data_map() + + if not self.cluster_local_data_map or not self.cluster_local_data_map.get(profiling_path): return False + self.default_rank_id = list(self.cluster_local_data_map[profiling_path].keys())[0] + + self.slow_rank_analyzer = SlowRankAnalyzer(profiling_path) + self.slow_link_analyzer = SlowLinkAnalyzer(profiling_path) + return len(self.cluster_local_data_map[profiling_path]) >= self.CLUSTER_RANK_THRESHOLD + def _get_profiling_path_by_rank(self, profiling_path, rank_id=None): # 根据传入的集群/单卡 profiling数据路径,以及rank id,获取对应rank的profiling数据路径,如果数据在obs则会自动下载至本地。 if not profiling_path: return None - if profiling_path.startswith(self.OBS_PREFIX): - # profiling数据在obs上时,自动下载对应rank的数据至本地,并返回本地路径 - return self._get_target_profiling_path_for_obs(profiling_path, rank_id) - return self._get_target_profiling_path_for_local(profiling_path, rank_id) def _get_target_profiling_path_for_local(self, profiling_path, rank_id): @@ -398,69 +398,8 @@ class AnalyzerController: return rank_id_map.get(rank_id) - def _get_target_profiling_path_for_obs(self, profiling_path, rank_id): - # 自动下载obs上对应数据至本地,并返回本地路径 - - if not MoxUtils.check_dir_exist(profiling_path): - return None - obs_profiling_handler = ObsProfilingFileHandler(collection_path=profiling_path) - - if not obs_profiling_handler.rank_ids: - logger.error("Can not find file like '%s' from profiling path %s", const.PROFILER_INFO_FILE_PATTERN, - profiling_path) - - if rank_id and rank_id not in obs_profiling_handler.rank_ids: - logger.warning("File 'profiler_info_%s.json' not exists in profiling path %s", rank_id, profiling_path) - return None - - if not rank_id and obs_profiling_handler.rank_ids: - rank_id = obs_profiling_handler.rank_ids[0] - - obs_profiling_handler.download_rank_profiling_files([rank_id]) - target_profiling_path = obs_profiling_handler.get_rank_profiling_local_path(rank_id) - return target_profiling_path - def _check_profiling_path_valid(self, profiling_path): - # 检查profiling数据路径是否存在,支持obs路径和本地路径 - - if profiling_path.startswith(self.OBS_PREFIX): - if not enable_mox: - logger.error("Only support analyze obs profiling while in ModelArts") - return False - - if not MoxUtils.check_dir_exist(profiling_path): - logger.error("Obs profiling path is not existed. Invalid profiling path: %s", profiling_path) - return False - return True - elif not Path(profiling_path).exists(): + if not Path(profiling_path).exists(): logger.error("Profiling path is not existed. Invalid profiling path: %s", profiling_path) return False - else: - return True - - @staticmethod - def _get_step_rank_for_cluster_statistic_diff(target_cluster_statistic_data, benchmark_cluster_statistic_data, - headers, dimension, get_max=False): - if dimension not in headers: - logger.error("Error dimension %s for cluster statistics data, optionals are %s.", dimension, headers) - return None, None - - dimension_index = headers.index(dimension) - diff_record = [] - for target_row_data, benchmark_row_data in zip(target_cluster_statistic_data, benchmark_cluster_statistic_data): - target_data = target_row_data[dimension_index] - benchmark_data = benchmark_row_data[dimension_index] - if not isinstance(target_data, (int, float)) or not isinstance(benchmark_data, (int, float)): - continue - diff_record.append(target_data - benchmark_data) - - if SlowRankAnalyzer.compute_max_gap_ratio(diff_record, sum(diff_record) / len( - diff_record)) < SlowRankAnalyzer.RATIO_THRESHOLD: - return None, None - - value = max(diff_record) if get_max else min(diff_record) - value_index = diff_record.index(value) - step = target_cluster_statistic_data[value_index][headers.index("step")] - target_rank_id = target_cluster_statistic_data[value_index][headers.index("rank_id")] - - return step, target_rank_id + return True diff --git a/profiler/advisor/analyzer/base_analyzer.py b/profiler/advisor/analyzer/base_analyzer.py index 441005b62..40df11c34 100644 --- a/profiler/advisor/analyzer/base_analyzer.py +++ b/profiler/advisor/analyzer/base_analyzer.py @@ -22,12 +22,16 @@ from profiler.advisor.common.version_control import VersionControl from profiler.advisor.dataset.dataset import Dataset from profiler.advisor.result.result import OptimizeResult from profiler.advisor.display.html.render import HTMLRender +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor +from profiler.advisor.utils.utils import safe_division logger = logging.getLogger() class BaseAnalyzer(VersionControl, metaclass=ABCMeta): _SUPPORT_VERSIONS = constant.SUPPORTED_CANN_VERSION + ANALYZER_HIGH_PRIORITY_TIME_RATIO = 0.05 + ANALYZER_MEDIUM_PRIORITY_TIME_RATIO = 0.03 dataset_cls_list = [] @@ -73,7 +77,23 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta): def optimize(self, **kwargs): pass - def init_dataset_list(self)->None: + @abstractmethod + def get_priority(self): + pass + + @staticmethod + def get_first_data_by_key(data, key) -> Union[Dataset, None]: + """ + get the first member from data with key + :param data: input data + :param key: data key + :return: the first dataset in dataset list + """ + if key in data and len(data[key]) > 0: + return data[key][0] + return None + + def init_dataset_list(self) -> None: dataset_cls_list = self.dataset_cls_list if len(dataset_cls_list) == 0: logger.warning(f"Analyzer: %s don't rely on any dataset!", self.__class__.__name__) @@ -87,14 +107,12 @@ class BaseAnalyzer(VersionControl, metaclass=ABCMeta): self.dataset_list[key] = [] self.dataset_list[key].append(dataset) - @staticmethod - def get_first_data_by_key(data, key) -> Union[Dataset, None]: - """ - get the first member from data with key - :param data: input data - :param key: data key - :return: the first dataset in dataset list - """ - if key in data and len(data[key]) > 0: - return data[key][0] - return None + def get_priority_by_time_ratio(self, dur, step_dur): + time_ratio = safe_division(dur, step_dur) + print(f" =================== time_ratio is {time_ratio}, dur is {dur}, step_dur is {step_dur} =================") + if time_ratio >= self.ANALYZER_HIGH_PRIORITY_TIME_RATIO: + return PriorityBackgroundColor.high + elif time_ratio >= self.ANALYZER_MEDIUM_PRIORITY_TIME_RATIO: + return PriorityBackgroundColor.medium + else: + return PriorityBackgroundColor.low diff --git a/profiler/advisor/analyzer/cluster/slow_link_analyzer.py b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py index e57704d4f..bb2ba5f2a 100644 --- a/profiler/advisor/analyzer/cluster/slow_link_analyzer.py +++ b/profiler/advisor/analyzer/cluster/slow_link_analyzer.py @@ -15,11 +15,16 @@ from collections import defaultdict from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterCommunicationDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowLinkAnalyzer(BaseAnalyzer): @@ -57,7 +62,7 @@ class SlowLinkAnalyzer(BaseAnalyzer): def optimize(self, **kwargs): if self.rank_bw_dict is None: - print("Slow link analysis failed due to data loading failure. \ + logger.error("Slow link analysis failed due to data loading failure. \ Please check your cluster_analysis_output folder. \ If you are not concerned about this type of data, please ignore this message.") return self.result @@ -76,7 +81,7 @@ class SlowLinkAnalyzer(BaseAnalyzer): if len(data_list) > 0: avg_bw = round(sum(data_list) / len(data_list), 3) else: - print("The slow link (identified bottleneck) cannot provide a bottleneck \ + logger.info("The slow link (identified bottleneck) cannot provide a bottleneck \ because the analysis data is missing bandwidth information.") return self.bottelneck += f'{link_type}: \n' \ @@ -95,8 +100,12 @@ class SlowLinkAnalyzer(BaseAnalyzer): details_dict = {} headers = list({k for rank_bw_value in self.rank_bw_dict.values() for k in rank_bw_value.keys()}) headers.sort() - data_list = [list(map(int, step_rank.split("-"))) + [rank_bw.get(k, 0) for k in headers] for step_rank, rank_bw - in self.rank_bw_dict.items()] + + data_list = [] + for step_rank, rank_bw in self.rank_bw_dict.items(): + step_rank_list = list(map(int, step_rank.split(constant.STEP_RANK_SEP))) + value_list = [rank_bw.get(i, 0) for i in headers] + data_list.append(step_rank_list + value_list) data_list.sort(key=lambda x: (x[0], x[1])) # 按rank_id排序 details_dict["headers"] = ["step", "rank_id"] + headers @@ -115,8 +124,8 @@ class SlowLinkAnalyzer(BaseAnalyzer): ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas["headers"], data) + for data in self.format_datas.get("data", []): + self.result.add_detail(SlowLinkAnalyzer.SLOW_LINK_ANALYSIS, self.format_datas.get("headers", []), data) def make_render(self, template_key="cluster"): result_for_html = { @@ -141,22 +150,39 @@ class SlowLinkAnalyzer(BaseAnalyzer): raise RuntimeError(f"Error bindwidth type {bindwidth_type}, optionals are {bindwidth_key_map.keys()}") headers = self.format_datas.get("headers") - bindwidth_index = headers.index(bindwidth_key_map.get(bindwidth_type)) - data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data")] - max_bandwidth, min_bandwidth = max(data_list), min(data_list) - if self.compute_max_gap_ratio(data_list, sum(data_list) / len( - data_list)) < self.RATIO_THRESHOLD: - return global_step_rank + bindwidth_index = safe_index(headers, bindwidth_key_map.get(bindwidth_type)) + + if bindwidth_index is not None: + data_list = [tuple_list[bindwidth_index] for tuple_list in self.format_datas.get("data", [])] + max_bandwidth, min_bandwidth = max(data_list), min(data_list) + + if self.compute_max_gap_ratio(data_list, sum(data_list) / len( + data_list)) < self.RATIO_THRESHOLD: + return global_step_rank - max_bandwidth_index = data_list.index(max_bandwidth) - min_bandwidth_index = data_list.index(min_bandwidth) - max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][headers.index("rank_id")] - max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][headers.index("step")] - min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][headers.index("rank_id")] - min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][headers.index("step")] + max_bandwidth_index = data_list.index(max_bandwidth) + min_bandwidth_index = data_list.index(min_bandwidth) - global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step} - global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step} + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + + if rank_id_index is None: + return global_step_rank + + max_bandwidth_rank_id = self.format_datas.get("data")[max_bandwidth_index][rank_id_index] + min_bandwidth_rank_id = self.format_datas.get("data")[min_bandwidth_index][rank_id_index] + + if step_index is None: + max_bandwidth_step, min_bandwidth_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP + else: + max_bandwidth_step = self.format_datas.get("data")[max_bandwidth_index][step_index] + min_bandwidth_step = self.format_datas.get("data")[min_bandwidth_index][step_index] + + global_step_rank["maximum"] = {"rank_id": max_bandwidth_rank_id, "step": max_bandwidth_step} + global_step_rank["minimum"] = {"rank_id": min_bandwidth_rank_id, "step": min_bandwidth_step} return global_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py index e04add6ba..fb81883fe 100644 --- a/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py +++ b/profiler/advisor/analyzer/cluster/slow_rank_analyzer.py @@ -13,13 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import defaultdict -from typing import Dict, List +import logging + from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.dataset.cluster.cluster_dataset import ClusterStepTraceTimeDataset +from profiler.advisor.utils.utils import safe_index + +logger = logging.getLogger() class SlowRankAnalyzer(BaseAnalyzer): @@ -35,7 +38,7 @@ class SlowRankAnalyzer(BaseAnalyzer): def __init__(self, collection_path, n_processes: int = 1, **kwargs): super().__init__(collection_path, n_processes, **kwargs) key = ClusterStepTraceTimeDataset.get_key() - self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) + self.step_trace_class = self.get_first_data_by_key(self.dataset_list, key) self.step_trace_dict = self.step_trace_class.get_data() self.stages = self.step_trace_class.get_stages() self.result = OptimizeResult() @@ -48,16 +51,23 @@ class SlowRankAnalyzer(BaseAnalyzer): def steps(self): return sorted(list(self._steps)) + @staticmethod + def compute_max_gap_ratio(data: list, mean: float): + if mean == 0: + return 0 + else: + return (max(data) - min(data)) / mean + def optimize(self, **kwargs): if self.step_trace_dict is None: - print("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ + logger.error("slow_rank 分析失败,原因是数据加载失败,请检查你的cluster_analysis_outpu文件夹 \ 如不关心这类数据请忽略") return self.result self.process() self.make_record() self.make_render(kwargs.get("template_key")) return self.result - + def process(self): total_time_list = [sum(data_tuple) for rank_id, data_tuple in self.step_trace_dict.items()] if total_time_list: @@ -77,7 +87,6 @@ class SlowRankAnalyzer(BaseAnalyzer): f' because the max difference of {self.BOTTLENECK_LIST[produce_type]} time \n' \ f' has reached {round(max_ratio * mean_total_time / 1000, 3)}ms. \n' - def make_record(self): """ make record for what and how to optimize @@ -89,15 +98,18 @@ class SlowRankAnalyzer(BaseAnalyzer): self.suggestion ) self.result.add(OptimizeRecord(optimization_item)) - for i, data in enumerate(self.format_datas["data"]): - self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, self.format_datas["headers"], data) + + data_list = self.format_datas.get("data", []) + headers = self.format_datas.get("headers", []) + for data in data_list: + self.result.add_detail(SlowRankAnalyzer.SLOW_RANK_ANALYSIS, headers, data) def format_details(self): details_dict = {} headers = ["step", "rank_id", "compute(us)", "communication(us)", "free(us)"] data_list = [] for key, value in self.step_trace_dict.items(): - step, rank_id = key.split("-") + step, rank_id = key.split(constant.STEP_RANK_SEP) data_list.append([int(step), int(rank_id)] + value) if step and step not in self._steps: self._steps.add(step) @@ -108,9 +120,9 @@ class SlowRankAnalyzer(BaseAnalyzer): def make_render(self, template_key="cluster"): result_for_html = { - "Description" : self.bottelneck, - "suggestion" : self.suggestion, - "details" : [self.format_datas] + "Description": self.bottelneck, + "suggestion": self.suggestion, + "details": [self.format_datas] } self.html_render.render_template(key=template_key, @@ -121,36 +133,17 @@ class SlowRankAnalyzer(BaseAnalyzer): torch_version=self.torch_version, result=result_for_html) - @staticmethod - def compute_max_gap_ratio(data: list, mean: float): - if mean == 0: - return 0 - else: - return (max(data) - min(data)) / mean - - @staticmethod - def get_min_max_rank_id(slow_rank_result, dimension): - slow_rank_data = slow_rank_result.get(SlowRankAnalyzer.SLOW_RANK_ANALYSIS) - headers = slow_rank_data.get("headers") - dimension_index = headers.index(dimension) - data_list = [tuple_list[dimension_index] for tuple_list in slow_rank_data.get("data")] - max_time, min_time = max(data_list), min(data_list) - - if SlowRankAnalyzer.compute_max_gap_ratio(data_list, sum(data_list) / len( - data_list)) < SlowRankAnalyzer.RATIO_THRESHOLD: - return None, None - max_time_index = data_list.index(max_time) - min_time_index = data_list.index(min_time) - max_time_rank_id = slow_rank_data.get("data")[max_time_index][headers.index("rank_id")] - min_time_rank_id = slow_rank_data.get("data")[min_time_index][headers.index("rank_id")] - - return max_time_rank_id, min_time_rank_id - def get_global_step_rank(self, dimension): global_step_rank = {} headers = self.format_datas.get("headers") - dimension_index = headers.index(dimension) + + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return global_step_rank + data_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] max_time, min_time = max(data_list), min(data_list) @@ -159,10 +152,15 @@ class SlowRankAnalyzer(BaseAnalyzer): return global_step_rank max_time_index = data_list.index(max_time) min_time_index = data_list.index(min_time) - max_time_rank_id = self.format_datas.get("data")[max_time_index][headers.index("rank_id")] - max_time_step = self.format_datas.get("data")[max_time_index][headers.index("step")] - min_time_rank_id = self.format_datas.get("data")[min_time_index][headers.index("rank_id")] - min_time_step = self.format_datas.get("data")[min_time_index][headers.index("step")] + + max_time_rank_id = self.format_datas.get("data")[max_time_index][rank_id_index] + min_time_rank_id = self.format_datas.get("data")[min_time_index][rank_id_index] + + if step_index is not None: + max_time_step = self.format_datas.get("data")[max_time_index][step_index] + min_time_step = self.format_datas.get("data")[min_time_index][step_index] + else: + max_time_step, min_time_step = constant.DEFAULT_STEP, constant.DEFAULT_STEP global_step_rank["maximum"] = {"rank_id": max_time_rank_id, "step": max_time_step} global_step_rank["minimum"] = {"rank_id": min_time_rank_id, "step": min_time_step} @@ -170,16 +168,22 @@ class SlowRankAnalyzer(BaseAnalyzer): return global_step_rank def get_stage_step_rank(self, dimension): + stage_step_rank = {} + headers = self.format_datas.get("headers") - dimension_index = headers.index(dimension) - rank_index = headers.index("rank_id") - step_index = headers.index("step") + dimension_index = safe_index(headers, dimension) + rank_id_index = safe_index(headers, "rank_id") + step_index = safe_index(headers, "step") + if dimension_index is None or rank_id_index is None: + return stage_step_rank - step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")] - rank_list = [tuple_list[rank_index] for tuple_list in self.format_datas.get("data")] + rank_list = [tuple_list[rank_id_index] for tuple_list in self.format_datas.get("data")] cost_time_list = [tuple_list[dimension_index] for tuple_list in self.format_datas.get("data")] - stage_step_rank = {} + if step_index is not None: + step_list = [tuple_list[step_index] for tuple_list in self.format_datas.get("data")] + else: + step_list = [constant.DEFAULT_STEP] * len(rank_list) for index, stage in enumerate(self.stages): tmp_step_list, tmp_rank_list, tmp_time_list = [], [], [] @@ -206,3 +210,6 @@ class SlowRankAnalyzer(BaseAnalyzer): "step": tmp_step_list[min_time_index]} return stage_step_rank + + def get_priority(self): + pass \ No newline at end of file diff --git a/profiler/advisor/analyzer/dataloader/dataloader_analyzer.py b/profiler/advisor/analyzer/dataloader/dataloader_analyzer.py index 291c3a1f9..3d1a537c2 100644 --- a/profiler/advisor/analyzer/dataloader/dataloader_analyzer.py +++ b/profiler/advisor/analyzer/dataloader/dataloader_analyzer.py @@ -5,26 +5,30 @@ from typing import List, Dict, Any from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.result.result import OptimizeResult from profiler.advisor.analyzer.dataloader.dataloader_checker import DataloaderChecker +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor from profiler.advisor.display.html.render import HTMLRender -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset logger = logging.getLogger() class DataloaderAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] def __init__(self, collection_path, n_processes: int = 1, **kwargs) -> None: super().__init__(collection_path, n_processes, **kwargs) - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.dataset = self.get_first_data_by_key(self.dataset_list, key) self.result = OptimizeResult() self.html_render = HTMLRender() - @BaseAnalyzer.check_data((TimelineEventDataset.get_key(),)) + @BaseAnalyzer.check_data((ScheduleAnalysisDataset.get_key(),)) def optimize(self, **kwargs): dataloader_checker = DataloaderChecker() dataloader_checker.check_slow_dataloader(self.dataset) dataloader_checker.make_record(self.result) - dataloader_checker.make_render(self.html_render) + dataloader_checker.make_render(self.html_render, priority=self.get_priority()) return self.result + + def get_priority(self): + return PriorityBackgroundColor.high diff --git a/profiler/advisor/analyzer/dataloader/dataloader_checker.py b/profiler/advisor/analyzer/dataloader/dataloader_checker.py index eb1886284..728a659fc 100644 --- a/profiler/advisor/analyzer/dataloader/dataloader_checker.py +++ b/profiler/advisor/analyzer/dataloader/dataloader_checker.py @@ -3,7 +3,7 @@ import re import logging import yaml -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.cluster_analyse.common_func.file_manager import FileManager @@ -22,7 +22,7 @@ class DataloaderChecker: self.dataloader_duration_threshold = None self._init_rule() - def check_slow_dataloader(self, event_dataset: TimelineEventDataset): + def check_slow_dataloader(self, event_dataset: ScheduleAnalysisDataset): """ :Param event_dataset: dataset of timeline event """ @@ -53,14 +53,16 @@ class DataloaderChecker: for optimization in self.optimization_item: result.add(OptimizeRecord(optimization)) - def make_render(self, html_render): + def make_render(self, html_render, **kwargs): if not self.dataloader_issues: return + priority = kwargs.get("priority") html_render.render_template(key="dataloader", template_dir="templates", template_name="slow_dataloader.html", desc=self.desc, - suggestions=self.suggestions) + suggestions=self.suggestions, + priority_background_color=priority) def _init_rule(self): dataloader_rule_path = os.path.join( diff --git a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py index b78ac3d79..32811cb2e 100644 --- a/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py +++ b/profiler/advisor/analyzer/graph_fusion/graph_fusion_analyzer.py @@ -44,8 +44,5 @@ class FusionOPAnalyzer(BaseAnalyzer): checker.make_record(self.result) self.html = checker.make_render(self.html_render, add_render_list) - def make_record(self): - pass - - def make_render(self): + def get_priority(self): pass diff --git a/profiler/advisor/analyzer/memory/memory_analyzer.py b/profiler/advisor/analyzer/memory/memory_analyzer.py index 7738cc23d..f4f1ca83a 100644 --- a/profiler/advisor/analyzer/memory/memory_analyzer.py +++ b/profiler/advisor/analyzer/memory/memory_analyzer.py @@ -4,25 +4,33 @@ from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.result.result import OptimizeResult from profiler.advisor.analyzer.memory.memory_checker import MemoryOpsChecker from profiler.advisor.display.html.render import HTMLRender -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor logger = logging.getLogger() class MemoryAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] def __init__(self, collection_path, n_processes: int = 1, **kwargs) -> None: super().__init__(collection_path, n_processes, **kwargs) - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.dataset = self.get_first_data_by_key(self.dataset_list, key) self.result = OptimizeResult() self.html_render = HTMLRender() - @BaseAnalyzer.check_data((TimelineEventDataset.get_key(),)) + @BaseAnalyzer.check_data((ScheduleAnalysisDataset.get_key(),)) def optimize(self, **kwargs): memory_checker = MemoryOpsChecker() memory_checker.check_memory_ops(self.dataset) memory_checker.make_record(self.result) - memory_checker.make_render(self.html_render) + memory_checker.make_render(self.html_render, priority=self.get_priority(memory_checker.max_mem_op_dur)) return self.result + + def get_priority(self, max_mem_op_dur): + step_duration = getattr(self.dataset, "step_duration", None) + if step_duration is None: + return PriorityBackgroundColor.low + + return self.get_priority_by_time_ratio(max_mem_op_dur, step_duration) diff --git a/profiler/advisor/analyzer/memory/memory_checker.py b/profiler/advisor/analyzer/memory/memory_checker.py index 4042dae9a..b143992d2 100644 --- a/profiler/advisor/analyzer/memory/memory_checker.py +++ b/profiler/advisor/analyzer/memory/memory_checker.py @@ -3,7 +3,7 @@ import re import logging import yaml -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset, MemCollector +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset, MemCollector from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.cluster_analyse.common_func.file_manager import FileManager @@ -20,8 +20,9 @@ class MemoryOpsChecker: self.desc = "" self.suggestions = [] self.memory_ops_duration_threshold = None + self.max_mem_op_dur = 0 - def check_memory_ops(self, event_dataset: TimelineEventDataset): + def check_memory_ops(self, event_dataset: ScheduleAnalysisDataset): """ :Param event_dataset: dataset of timeline event """ @@ -31,15 +32,17 @@ class MemoryOpsChecker: return rule = event_dataset.memory_ops.rule - max_dur = rule.get("max_total_duration") + max_dur_thres = rule.get("max_total_duration") raw_problem = rule.get("problem") for memory_op_name, memory_op_info in event_dataset.memory_ops.mem_op_info.items(): # convert to milli-sec op_dur = memory_op_info.get("total_dur") / 1000 op_count = memory_op_info.get("count") - if op_dur < max_dur: + if op_dur < max_dur_thres: continue + if op_dur > self.max_mem_op_dur: + self.max_mem_op_dur = op_dur self.memory_issues = True self.desc += raw_problem.format(memory_op_num=op_count, memory_op_name=memory_op_name, @@ -62,11 +65,13 @@ class MemoryOpsChecker: for optimization in self.optimization_item: result.add(OptimizeRecord(optimization)) - def make_render(self, html_render): + def make_render(self, html_render, **kwargs): if not self.memory_issues: return + priority = kwargs.get("priority") html_render.render_template(key="memory", template_dir="templates", template_name="memory.html", desc=self.desc, - suggestions=self.suggestions) + suggestions=self.suggestions, + priority_background_color=priority) diff --git a/profiler/advisor/analyzer/schedule/dispatch/timeline_op_dispatch_analyzer.py b/profiler/advisor/analyzer/schedule/dispatch/timeline_op_dispatch_analyzer.py index 0e62a3ff0..58b2c301b 100644 --- a/profiler/advisor/analyzer/schedule/dispatch/timeline_op_dispatch_analyzer.py +++ b/profiler/advisor/analyzer/schedule/dispatch/timeline_op_dispatch_analyzer.py @@ -16,26 +16,26 @@ # limitations under the License. import logging - from profiler.advisor.common import constant as const from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.result.result import OptimizeResult from profiler.advisor.display.html.render import HTMLRender +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor logger = logging.getLogger() class OpDispatchAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] """ operator dispatch optimizer """ def __init__(self, collection_path, n_processes: int = 1, **kwargs) -> None: super().__init__(collection_path, n_processes, **kwargs) - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.dataset = self.get_first_data_by_key(self.dataset_list, key) self.result = OptimizeResult() self.html_render = HTMLRender() @@ -54,21 +54,21 @@ class OpDispatchAnalyzer(BaseAnalyzer): self.make_render(self.html_render) return self.result - def get_op_compile_info(self, event_dataset: TimelineEventDataset): - """ - :Param event_dataset: dataset of timeline event - """ - if hasattr(event_dataset, "ops_compile"): - self._op_compile = getattr(event_dataset, "ops_compile") - if not self._op_compile or self._op_compile.total_count < const.MAX_OP_COMPILE_NUM: - return + def get_op_compile_info(self, event_dataset: ScheduleAnalysisDataset): + """ + :Param event_dataset: dataset of timeline event + """ + if hasattr(event_dataset, "ops_compile"): + self._op_compile = getattr(event_dataset, "ops_compile") + if not self._op_compile or self._op_compile.total_count < const.MAX_OP_COMPILE_NUM: + return - self._issues_record.append(['operator dispatch', - const.OP_COMPILE_ID, - self._op_compile.total_count, - self._op_compile.total_time]) - else: - logger.debug("Skip operator compile checker, because no op_compile attr find.") + self._issues_record.append(['operator dispatch', + const.OP_COMPILE_ID, + self._op_compile.total_count, + self._op_compile.total_time]) + else: + logger.debug("Skip operator compile checker, because no op_compile attr find.") def make_record(self, result: OptimizeResult): """ @@ -77,8 +77,9 @@ class OpDispatchAnalyzer(BaseAnalyzer): if not self._op_compile or len(self._issues_record) <= 0: return desc = f"Found {self._op_compile.total_count} operator compile issues." - suggestion = (f"Please use `torch_npu.npu.set_compile_mode(jit_compile=False)` to disable jit compile " - f"in dynamic shape usage.") + suggestion = ("Please place the following code at the entrance of the python script to disable jit compile. " \ + "Code: `torch_npu.npu.set_compile_mode(jit_compile=False); " + "torch_npu.npu.config.allow_internal_format = False`") self.optimization_item.append(OptimizeItem("Operator dispatch", desc, [suggestion])) for optimization in self.optimization_item: result.add(OptimizeRecord(optimization)) @@ -87,7 +88,7 @@ class OpDispatchAnalyzer(BaseAnalyzer): for op_info in self._issues_record: result.add_detail('operator dispatch', detail=op_info) - def make_render(self, html_render): + def make_render(self, html_render, **kwargs): issues = [] optimizations = [] for optimization in self.optimization_item: @@ -97,11 +98,20 @@ class OpDispatchAnalyzer(BaseAnalyzer): )) for record in self._issues_record: issues.append(dict(issue=record[0], - op_name=record[1], - counts=record[2], - total_time=record[3])) + op_name=record[1], + counts=record[2], + total_time=record[3])) html_render.render_template(key="schedule", template_dir="templates", template_name="operator_dispatch.html", issues=issues, - optimizers=optimizations) + optimizers=optimizations, + priority_background_color=self.get_priority()) + + def get_priority(self): + step_duration = getattr(self.dataset, "step_duration", None) + op_compile_total_dur = getattr(self._op_compile, "total_time", None) + if step_duration is None or op_compile_total_dur is None: + return PriorityBackgroundColor.low + + return self.get_priority_by_time_ratio(op_compile_total_dur, step_duration) diff --git a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py index 1a6dd194f..305d23994 100644 --- a/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py +++ b/profiler/advisor/analyzer/schedule/fusion_ops/fusion_ops_analyzer.py @@ -8,25 +8,29 @@ from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.common import constant as const from profiler.advisor.common.analyzer_scopes import SupportedScopes from profiler.advisor.common.timeline.event import TimelineEvent -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.utils.utils import format_timeline_result from profiler.advisor.common.timeline.fusion_ops_db import init_timeline_ops_db +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor logger = logging.getLogger() class TimelineFusionOpsAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] def __init__(self, collection_path, n_processes: int = 1, **kwargs): super().__init__(collection_path, n_processes, **kwargs) self._matched_op_index = {} if self.n_processes <= 1 else multiprocessing.Manager().dict() self.matched_op_stacks = {} self.empty_stacks = True - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.timeline_event_dataset = self.get_first_data_by_key(self.dataset_list, key) + def get_priority(self): + return PriorityBackgroundColor.low + def optimize(self, **kwargs): for mode in [const.ATEN.lower(), const.OPTIMIZER.lower()]: @@ -186,7 +190,8 @@ class TimelineFusionOpsAnalyzer(BaseAnalyzer): empty_stacks=self.empty_stacks, with_stack_doc_url=const.TIMELINE_WITH_STACK_DOC_URL, api_doc_url=const.TIMELINE_API_DOC_URL, - result=format_result_for_html) + result=format_result_for_html, + priority_background_color=self.get_priority()) def query_stack(self, event_dataset): if all([len(matched_index) == 0 for matched_index in self._matched_op_index.values()]): diff --git a/profiler/advisor/analyzer/schedule/fusion_ops/timeline_api_stack_checker.py b/profiler/advisor/analyzer/schedule/fusion_ops/timeline_api_stack_checker.py index f684a4892..d96421ead 100644 --- a/profiler/advisor/analyzer/schedule/fusion_ops/timeline_api_stack_checker.py +++ b/profiler/advisor/analyzer/schedule/fusion_ops/timeline_api_stack_checker.py @@ -3,7 +3,7 @@ from typing import List from profiler.advisor.common import constant as const from profiler.advisor.common.timeline.event import TimelineEvent -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ComputationAnalysisDataset from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.utils.utils import get_analyze_processes, ParallelJob @@ -21,7 +21,8 @@ class OpStackFinder: self.task_type = None self.matched_index = set() - def get_api_stack_by_op(self, event_dataset: TimelineEventDataset, op_name: List[str] = None, task_type: str = None, + def get_api_stack_by_op(self, event_dataset: ComputationAnalysisDataset, op_name: List[str] = None, + task_type: str = None, disable_multiprocess=False): """ :Param event_dataset: dataset of timeline event @@ -82,7 +83,7 @@ class OpStackFinder: for op_info in self._stack_record: result.add_detail('operator stacks', detail=op_info) - def _get_api_stack_by_op(self, event_dataset: TimelineEventDataset, op_name: str, task_type: str): + def _get_api_stack_by_op(self, event_dataset: ComputationAnalysisDataset, op_name: str, task_type: str): for _, src_op_event in event_dataset.ops_with_task_type.items(): op_task_type = src_op_event.get(const.TASK_TYPE) @@ -110,6 +111,7 @@ class OpStackFinder: task_id = src_op_event.task_id if not task_id: continue + self.matched_index.add(dst_op_index) if dst_op_index not in self._task_id_record: self._task_id_record[dst_op_index] = [] @@ -122,7 +124,7 @@ class OpStackFinder: if not dst_op_event: return const.TIMELINE_BACKWARD_NO_STACK_CODE - return dst_op_event.get("dataset_index") + return int(dst_op_event.get("dataset_index")) def _query_index_by_acl_to_npu(self, acl_to_npu_event): if acl_to_npu_event: @@ -148,6 +150,7 @@ class OpStackFinder: return None event = TimelineEvent(event) stack = event.args.get(const.CALL_STACKS) + stack = stack if stack else const.NO_STACK_REASON_MAP.get(const.TIMELINE_BACKWARD_NO_STACK_CODE) for matched_op_info in self._task_id_record.get(index, []): self._stack_record.append([*matched_op_info, stack]) @@ -157,7 +160,8 @@ class OpStackFinder: const.NO_STACK_REASON_MAP.get(const.TIMELINE_ACL_TO_NPU_NO_STACK_CODE)]) return None - def query_stack(self, event_dataset: TimelineEventDataset): + def query_stack(self, event_dataset: ComputationAnalysisDataset): + if not event_dataset.dataset_len: return _ = event_dataset.parse_data_with_generator(self._query_stack_by_matched_index) diff --git a/profiler/advisor/analyzer/schedule/syncbn/syncbn_analyzer.py b/profiler/advisor/analyzer/schedule/syncbn/syncbn_analyzer.py index 2786a7840..df8c22fa5 100644 --- a/profiler/advisor/analyzer/schedule/syncbn/syncbn_analyzer.py +++ b/profiler/advisor/analyzer/schedule/syncbn/syncbn_analyzer.py @@ -1,30 +1,32 @@ import logging -from typing import List, Dict, Any - from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.result.result import OptimizeResult from profiler.advisor.analyzer.schedule.syncbn.syncbn_checker import SyncBNChecker +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor from profiler.advisor.display.html.render import HTMLRender -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset logger = logging.getLogger() class SyncBNAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] def __init__(self, collection_path, **kwargs): super().__init__(collection_path, **kwargs) self.result = OptimizeResult() self.html_render = HTMLRender() - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.timeline_event_dataset = self.get_first_data_by_key(self.dataset_list, key) - @BaseAnalyzer.check_data((TimelineEventDataset.get_key(),)) + @BaseAnalyzer.check_data((ScheduleAnalysisDataset.get_key(),)) def optimize(self, **kwargs): syncbn_checker = SyncBNChecker() syncbn_checker.check_syncbn(self.timeline_event_dataset) syncbn_checker.make_record(self.result) - syncbn_checker.make_render(self.html_render) + syncbn_checker.make_render(self.html_render, priority=self.get_priority()) return self.result + + def get_priority(self): + return PriorityBackgroundColor.high \ No newline at end of file diff --git a/profiler/advisor/analyzer/schedule/syncbn/syncbn_checker.py b/profiler/advisor/analyzer/schedule/syncbn/syncbn_checker.py index c0e10448f..e83a15491 100644 --- a/profiler/advisor/analyzer/schedule/syncbn/syncbn_checker.py +++ b/profiler/advisor/analyzer/schedule/syncbn/syncbn_checker.py @@ -1,7 +1,7 @@ import logging import os -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.cluster_analyse.common_func.file_manager import FileManager @@ -20,7 +20,7 @@ class SyncBNChecker: self.max_syncbn_num = None self._init_rule() - def check_syncbn(self, event_dataset: TimelineEventDataset): + def check_syncbn(self, event_dataset: ScheduleAnalysisDataset): """ :Param event_dataset: dataset of timeline event """ @@ -43,14 +43,17 @@ class SyncBNChecker: for optimization in self.optimization_item: result.add(OptimizeRecord(optimization)) - def make_render(self, html_render): + def make_render(self, html_render, **kwargs): if not self.syncbn_issues: return + + priority = kwargs.get("priority") html_render.render_template(key="schedule", template_dir="templates", template_name="sync_batchnorm.html", desc=self.desc, - solutions=self.solutions) + solutions=self.solutions, + priority_background_color=priority) def _init_rule(self): syncbn_rule_path = os.path.join( diff --git a/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_analyzer.py b/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_analyzer.py index d8906504c..61ec7d1fa 100644 --- a/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_analyzer.py +++ b/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_analyzer.py @@ -5,28 +5,33 @@ from typing import List, Dict, Any from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer from profiler.advisor.result.result import OptimizeResult from profiler.advisor.analyzer.schedule.synchronize_stream.synchronize_stream_checker import SynchronizeStreamChecker +from profiler.advisor.display.html.priority_background_color import PriorityBackgroundColor from profiler.advisor.display.html.render import HTMLRender -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset logger = logging.getLogger() class SynchronizeStreamAnalyzer(BaseAnalyzer): - dataset_cls_list = [TimelineEventDataset] + dataset_cls_list = [ScheduleAnalysisDataset] def __init__(self, collection_path, **kwargs): super().__init__(collection_path, **kwargs) self.result = OptimizeResult() self.html_render = HTMLRender() - key = TimelineEventDataset.get_key() + key = ScheduleAnalysisDataset.get_key() self.timeline_event_dataset = self.get_first_data_by_key(self.dataset_list, key) - @BaseAnalyzer.check_data((TimelineEventDataset.get_key(),)) + @BaseAnalyzer.check_data((ScheduleAnalysisDataset.get_key(),)) def optimize(self, **kwargs): synchronize_stream_checker = SynchronizeStreamChecker() synchronize_stream_checker.check_synchronize(self.timeline_event_dataset, kwargs.get("profiling_with_stack")) synchronize_stream_checker.make_record(self.result) - synchronize_stream_checker.make_render(self.html_render) + synchronize_stream_checker.make_render(self.html_render, priority=self.get_priority()) return self.result + + + def get_priority(self): + return PriorityBackgroundColor.low \ No newline at end of file diff --git a/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_checker.py b/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_checker.py index 83ddd80a0..7af46f766 100644 --- a/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_checker.py +++ b/profiler/advisor/analyzer/schedule/synchronize_stream/synchronize_stream_checker.py @@ -1,7 +1,7 @@ import logging from profiler.advisor.common import constant as const -from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset +from profiler.advisor.dataset.timeline_event_dataset import ScheduleAnalysisDataset from profiler.advisor.result.result import OptimizeResult from profiler.advisor.result.item import OptimizeItem, OptimizeRecord from profiler.advisor.analyzer.schedule.timeline_base_checker import TimelineBaseChecker @@ -21,7 +21,7 @@ class SynchronizeStreamChecker(TimelineBaseChecker): self.solutions = [] self.max_synchronize_num = None - def check_synchronize(self, event_dataset: TimelineEventDataset, profiling_with_stack=None): + def check_synchronize(self, event_dataset: ScheduleAnalysisDataset, profiling_with_stack=None): """ :Param event_dataset: dataset of timeline event """ @@ -73,10 +73,10 @@ class SynchronizeStreamChecker(TimelineBaseChecker): for optimization in self.optimization_item: result.add(OptimizeRecord(optimization)) - def make_render(self, html_render): + def make_render(self, html_render, **kwargs): if not self.synchronize_issues: return - + priority = kwargs.get("priority") format_result_for_html = format_timeline_result(dict(self.matched_op_stacks), dump_html=True) html_render.render_template(key="schedule", template_dir="templates", @@ -86,4 +86,5 @@ class SynchronizeStreamChecker(TimelineBaseChecker): result=format_result_for_html, with_stack_doc_url=const.TIMELINE_WITH_STACK_DOC_URL, empty_stacks=self.empty_stacks, - framework_black_list=self.framework_black_list) + framework_black_list=self.framework_black_list, + priority_background_color=priority) diff --git a/profiler/advisor/common/constant.py b/profiler/advisor/common/constant.py index e44619ee2..dd731dd53 100644 --- a/profiler/advisor/common/constant.py +++ b/profiler/advisor/common/constant.py @@ -33,6 +33,7 @@ TASK_TYPE = "Task Type" CPU_OP = "cpu_op" AI_CORE = "AI_CORE" AI_CPU = "AI_CPU" +MIX_AIC = "MIX_AIC" CALL_STACKS = "Call stack" INPUT_DIMS = "Input Dims" OP_SEP = "-" @@ -149,5 +150,7 @@ PROFILER_INFO_FILE_PATTERN = r"profiler_info_(\d+)\.json" DISABLE_STREAMINIG_READER = "DISABLE_STREAMINIG_READER" FRAMEWORK_STACK_BLACK_LIST = ["torch", "torch_npu", "megatron", "deepspeed"] DISABLE_STREAMING_READER = "DISABLE_STREAMING_READER" -MAX_FILE_SIZE = 10**10 -MAX_NUM_PROCESSES = 4 \ No newline at end of file +MAX_FILE_SIZE = 10 ** 10 +MAX_NUM_PROCESSES = 4 +DEFAULT_STEP = "-1" +STEP_RANK_SEP = "_" diff --git a/profiler/advisor/dataset/cluster/cluster_dataset.py b/profiler/advisor/dataset/cluster/cluster_dataset.py index 1f299afc5..cd7507962 100644 --- a/profiler/advisor/dataset/cluster/cluster_dataset.py +++ b/profiler/advisor/dataset/cluster/cluster_dataset.py @@ -1,3 +1,17 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os @@ -11,6 +25,7 @@ from profiler.cluster_analyse.common_func.constant import Constant from collections import defaultdict from profiler.cluster_analyse.cluster_analysis import Interface from profiler.advisor.dataset.cluster.cluster_step_trace_time_bean import ClusterStepTraceTimeBean +from profiler.advisor.dataset.cluster.hccl_collection import HcclInfo logger = logging.getLogger() @@ -87,7 +102,15 @@ class ClusterStepTraceTimeDataset(ClusterDataset): step_dict = defaultdict(lambda: [0, 0, 0]) for step_bean in step_data: if step_bean.type == self.RANK: - step_rank_index = f"{step_bean.step}-{step_bean.index}" + step_rank_record = [] + step = str(step_bean.step).replace(" ", "") or str(const.DEFAULT_STEP) + rank = str(step_bean.index).replace(" ", "") + if step: + step_rank_record.append(step) + if rank: + step_rank_record.append(rank) + + step_rank_index = const.STEP_RANK_SEP.join(step_rank_record) step_dict[step_rank_index][0] += step_bean.compute step_dict[step_rank_index][1] += step_bean.communication step_dict[step_rank_index][2] += step_bean.free @@ -104,6 +127,7 @@ class ClusterStepTraceTimeDataset(ClusterDataset): def get_stages(self): return sorted(self._stages) + @singleton class ClusterCommunicationDataset(ClusterDataset): RDMA_TIME_MS = "RDMA time(ms)" @@ -125,6 +149,7 @@ class ClusterCommunicationDataset(ClusterDataset): self.SDMA_TIME_MS: 0, self.SDMA_SIZE_MB: 0, }) + self.hccl_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) super().__init__(collection_path, data) @staticmethod @@ -147,9 +172,26 @@ class ClusterCommunicationDataset(ClusterDataset): def process(self, communication_json: dict): for comm_group, group_dict in communication_json.items(): + if self.hccl_dict.get(comm_group) is None: + self.hccl_dict.setdefault(comm_group, defaultdict(lambda: defaultdict(list))) for step, step_dict in group_dict.items(): for op, op_dict in step_dict.items(): - self.compute_bandwidth(step.lower().lstrip("step"), op_dict) + self.compute_bandwidth(step.lower().lstrip("step") or str(const.DEFAULT_STEP), op_dict) + self.process_hccl_info(comm_group, step, op, op_dict) + + def process_hccl_info(self, group, step, op, op_dict): + op_name = op.split("@")[0] + for rank_id, rank_dict in op_dict.items(): + try: + hccl_info = HcclInfo(group, step, rank_id, op, rank_dict) + if self.hccl_dict[group].get(op_name) is None: + self.hccl_dict[group].setdefault(op_name, defaultdict(list)) + if self.hccl_dict[group][op_name].get(step) is None: + self.hccl_dict[group][op_name].setdefault(step, list()) + self.hccl_dict[group][op_name][step].append(hccl_info) + except ValueError as e: + msg = "[ERROR] Cluster_communication.json has invalid structure." + raise ValueError(msg) from e def compute_bandwidth(self, step, op_dict: dict): for rank_id, rank_dict in op_dict.items(): @@ -160,13 +202,13 @@ class ClusterCommunicationDataset(ClusterDataset): raise ValueError(msg) from e for comm_type, bw_dict in rank_dict.get(self.COMMUNICATION_BANDWIDTH_INFO, {}).items(): if comm_type == self.SDMA: - self.rank_bw_dict[f"{step}-{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[f"{step}-{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.SDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) if comm_type == self.RDMA: - self.rank_bw_dict[f"{step}-{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) - self.rank_bw_dict[f"{step}-{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_SIZE_MB] += bw_dict.get(self.TRANSIT_SIZE) + self.rank_bw_dict[f"{step}{const.STEP_RANK_SEP}{rank}"][self.RDMA_TIME_MS] += bw_dict.get(self.TRANSIT_TIME) - for step_rank, rank_dict in self.rank_bw_dict.items(): + for step_rank in self.rank_bw_dict.keys(): self.rank_bw_dict[step_rank][self.RDMA_BANDWIDTH] = self.compute_ratio( self.rank_bw_dict[step_rank][self.RDMA_SIZE_MB], self.rank_bw_dict[step_rank][self.RDMA_TIME_MS]) self.rank_bw_dict[step_rank][self.SDMA_BANDWIDTH] = self.compute_ratio( diff --git a/profiler/advisor/dataset/timeline_event_dataset.py b/profiler/advisor/dataset/timeline_event_dataset.py index 71419dbbd..147726e38 100644 --- a/profiler/advisor/dataset/timeline_event_dataset.py +++ b/profiler/advisor/dataset/timeline_event_dataset.py @@ -1,128 +1,46 @@ -import json +import inspect import logging -import os -from typing import List, Any import traceback +from collections import OrderedDict import ijson from tqdm import tqdm -import yaml from profiler.advisor.common import constant as const from profiler.advisor.common.timeline.event import TimelineEvent -from profiler.advisor.utils.utils import get_file_path_from_directory, check_path_valid, singleton -from profiler.cluster_analyse.common_func.file_manager import FileManager +from profiler.advisor.utils.utils import get_file_path_from_directory, check_path_valid, singleton, convert_to_float +from profiler.advisor.dataset.timeline_op_collector.timeline_op_collector import ( + OpCompileCollector, + SynchronizeStreamCollector, + MemCollector, + DataloaderCollector, + SyncBNCollector, + AtenCollector, + OptimizerCollector, + FrequencyCollector, + SpecificTaskTypeOpCollector, + TorchToNpuCollector, + AclToNpuCollector, + OpStackCollector, + StepCollector +) logger = logging.getLogger() -class OpCompileCollector: - def __init__(self): - self._total_op_compile_counter = 0 - self._total_op_compile_time = 0.0 +class BaseTimelineEventDataset: + PROFILER_STEP_PREFIX = "ProfilerStep" - @property - def total_time(self): - return self._total_op_compile_time - - @property - def total_count(self): - return self._total_op_compile_counter - - def is_empty(self): - return self._total_op_compile_counter == 0 - - def update(self, event: TimelineEvent): - self._total_op_compile_time += float(event.dur) - self._total_op_compile_counter += 1 - - def unset(self): - self._total_op_compile_counter = 0 - self._total_op_compile_time = 0.0 - - -class SynchronizeStreamCollector: - - def __init__(self): - self._synchronize_stream_count = 0 - self._slow_synchronize_stream = [] - self.rule = SynchronizeStreamCollector._load_rule() - - @property - def total_count(self): - return self._synchronize_stream_count - - @property - def slow_synchronize_stream(self): - return self._slow_synchronize_stream - - @staticmethod - def _load_rule(): - sync_stream_rule_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "rules", - "synchronize.yaml") - - sync_stream_rule = FileManager.read_yaml_file(sync_stream_rule_path) - return sync_stream_rule - - def update_sync_stream_count(self): - self._synchronize_stream_count += 1 - - def append_slow_sync_stream(self, event): - if float(event.dur) / 1000 >= self.rule.get("slow_synchronize_threshold", 10): - self._slow_synchronize_stream.append(event) - - def unset(self): - self._synchronize_stream_count = 0 - self._slow_synchronize_stream = [] - - -class MemCollector: - MEMORY_OP_NAME = ["AscendCL@aclMallocMemInner", "AscendCL@aclrtFreePhysical"] - - def __init__(self): - self.mem_op_info = {} - self.rule = self._load_rule() - - @staticmethod - def _load_rule(): - memory_rule_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "rules", - "memory.yaml") - - memory_rule = FileManager.read_yaml_file(memory_rule_path) - return memory_rule - - def append_mem_op(self, event): - if event.name not in self.MEMORY_OP_NAME: - return - - if event.name not in self.mem_op_info: - self.mem_op_info[event.name] = dict(count=0, total_dur=0) - self.mem_op_info[event.name]["count"] += 1 - self.mem_op_info[event.name]["total_dur"] += float(event.dur) - - -@singleton -class TimelineEventDataset: + collector_map = {} def __init__(self, collection_path, data: dict, build_dataset=True, **kwargs) -> None: - self._ops_with_task_type = {} - self._ops_with_stack = {} - self._ops_compile = OpCompileCollector() - self._memory_ops = MemCollector() - self._torch_to_npu = {} - self._acl_to_npu = set() - self._aten: List[Any] = [] - self._optimizer: List[Any] = [] - self._dataloader: List[Any] = [] - self._sync_batchnorm: List[Any] = [] - self._synchronize_stream = SynchronizeStreamCollector() self.timeline_dir = collection_path + self.profiler_step = [] self.timeline_data_list = get_file_path_from_directory(collection_path, lambda file: file.endswith("trace_view.json")) self.dataset_len = None - self.analysis_mode = kwargs.get("analysis_mode") - self.task_type = kwargs.get("task_type") - + self.step = kwargs.get("step") + self.step_duration = None if not build_dataset: return @@ -132,59 +50,6 @@ class TimelineEventDataset: data[key] = [] data[key].append(self) - if self.analysis_mode in ["op_stack", "all"]: - self._task_op_names = list(set([event_key.split("-")[0] for event_key in self._ops_with_task_type.keys()])) - - self._post_process() - - @property - def ops_with_stack(self): - return self._ops_with_stack - - @property - def ops_compile(self): - return self._ops_compile - - @property - def memory_ops(self): - return self._memory_ops - - @property - def torch_to_npu(self): - return self._torch_to_npu - - @property - def acl_to_npu(self): - return self._acl_to_npu - - @property - def ops_with_task_type(self): - return self._ops_with_task_type - - @property - def task_op_names(self): - return self._task_op_names - - @property - def optimizer(self): - return self._optimizer - - @property - def aten(self): - return self._aten - - @property - def dataloader(self): - return self._dataloader - - @property - def sync_batchnorm(self): - return self._sync_batchnorm - - @property - def synchronize_stream(self): - return self._synchronize_stream - @classmethod def get_key(cls): """ @@ -193,6 +58,23 @@ class TimelineEventDataset: """ return cls.__module__.rsplit('.', maxsplit=1)[-1] + def get_post_process_kwargs(self, func_name): + kwargs = {} + if func_name == FrequencyCollector.__name__: + ops_with_task_type = getattr(self, "ops_with_task_type", {}).values() + kwargs["ai_core_ops"] = [op for op in ops_with_task_type if + op.get(const.TASK_TYPE) in [const.AI_CORE, const.MIX_AIC]] + return kwargs + + def add_event(self, index, event): + event["dataset_index"] = index + if not isinstance(event, TimelineEvent): + event = TimelineEvent(event) + + for _, collector in self.collector_map.items(): + collector.add_op(event) + return True + def parse(self): if len(self.timeline_data_list) == 0: @@ -203,7 +85,7 @@ class TimelineEventDataset: logger.warning("Found multiple trace_view.json in %s, load the file of device 0 for analysis .", self.timeline_dir) - result = self.parse_data_with_generator(self._add_event) + result = self.parse_data_with_generator(self.add_event) if not self.dataset_len: self.dataset_len = len(result) @@ -229,136 +111,96 @@ class TimelineEventDataset: timeline_data_path) return result - def _add_ops_with_task_type(self, event): - key = f"{event.name}-{event.ts}" - self._ops_with_task_type[key] = TimelineEvent( - { - const.TASK_TYPE: event.args.get(const.TASK_TYPE), - "task_id": event.args.get("Task Id"), - "tid": event.tid, - "name": event.name, - "ts": str(event.ts) - } - ) - - def _add_ops_with_stack(self, event): - self._ops_with_stack[str(event.ts)] = TimelineEvent({"name": event.name, "dataset_index": event.dataset_index}) - - def _add_torch_to_npu(self, event): - key = f"{event.ph}-{event.id}" - self._torch_to_npu[key] = TimelineEvent({"tid": event.tid, "ts": str(event.ts)}) - - def _add_acl_to_npu(self, event): - # op with task type equals to ai_cpu which derived from acl_to_npu do not have stacks - self._acl_to_npu.add(str(event.ts)) - - def _add_op_compile(self, event: TimelineEvent): - if event.name == const.OP_COMPILE_NAME or event.args.get("id") == const.OP_COMPILE_ID: - self._ops_compile.update(event) - - def _add_optimizer(self, event: TimelineEvent): - self._optimizer.append(TimelineEvent({"name": event.name, "dataset_index": event.dataset_index})) - - def _add_aten(self, event: TimelineEvent): - self._aten.append(TimelineEvent({ - "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur - })) - - def _add_dataloader(self, event: TimelineEvent): - if "dataloader" in event.name.lower(): - self._dataloader.append(TimelineEvent({ - "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur, - "stack": event.args.get("Call stack") - })) - - def _add_sync_batchnorm(self, event: TimelineEvent): - if event.name.lower() == "syncbatchnorm": - self._sync_batchnorm.append(TimelineEvent({ - "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur - })) - - def _add_synchronize(self, event: TimelineEvent): - if event.name.startswith(const.SYNC_STREAM): - self._synchronize.append(TimelineEvent({ - "name": event.name, "ts": event.ts, "dur": event.dur - })) - - def _add_memory_ops(self, event: TimelineEvent): - self._memory_ops.append_mem_op(event) - - def _add_specific_operator(self, event): - # for analysis of operator aclOpCompile, enable jit_compILE=False - self._add_op_compile(event) - # for analysis of slow dataloader.__next__ - self._add_dataloader(event) - # for analysis of syncBatchNorm operator, prompt users to replace source code of torch_npu's syncbn - self._add_sync_batchnorm(event) - # for analysis of memory - self._add_memory_ops(event) - - def _add_event(self, index, event): - event["dataset_index"] = index - if not isinstance(event, TimelineEvent): - event = TimelineEvent(event) - - self._add_specific_operator(event) - - if self.analysis_mode == "fusion_ops": - self._add_event_for_fusion_ops(event) - elif self.analysis_mode == "op_stack": - self._add_event_for_op_stack(event) + def _get_target_ops_by_step(self, op_list): + target_ops = [] + if not self.step or f"ProfilerStep#{self.step}" not in [event.name for event in self.profiler_step]: + target_ops = op_list else: - self._add_event_for_fusion_ops(event) - self._add_event_for_op_stack(event) - return True + for step_event in self.profiler_step: + if step_event.name != f"ProfilerStep#{self.step}": + continue + self.step_duration = convert_to_float(step_event.dur) + for op_event in op_list: + if step_event.ts_include(op_event): + target_ops.append(op_event) + target_ops.sort(key=lambda x: convert_to_float(x.ts)) + return target_ops + + def _collector_post_process(self): + # 按step过滤collector中的算子,并将过滤后的算子设置为当前dataset的property,与原始TimelineEventDataset的property保持一致 + for collector_name, collector in self.collector_map.items(): + logger.debug("Start post process for operator collector: %s", collector_name) + if collector.require_filter_by_step: + logger.debug("Operator Collector %s requires filter ops by step %s", collector_name, self.step) + target_op_list = self._get_target_ops_by_step(collector.op_list) + else: + logger.debug("Operator Collector %s use operators of all step for analysis", collector_name) + target_op_list = collector.op_list - def _add_event_for_fusion_ops(self, event): - if event.name.lower().startswith(f"{const.ATEN}{const.ATEN_SEP}") or event.name.lower().startswith( - f"{const.NPU}{const.ATEN_SEP}"): - self._add_aten(event) - return + logger.debug("Source number of ops is %s, number of ops after filtered by rank is %s", + len(collector.op_list), len(target_op_list)) - # 检查cann层同步操作,根据时间窗口索引到host侧的aten算子并给出堆栈 - if event.name.startswith(const.SYNC_STREAM): - self._add_aten(event) + print(f"collector_name is {collector_name}") + collector_kwargs = self.get_post_process_kwargs(collector_name) + collector.post_process(target_op_list, **collector_kwargs) + for property_name, property_value in collector.attribute_to_dataset.items(): + setattr(self, property_name, property_value) - if event.name.startswith(f"{const.OPTIMIZER}.{const.OPTIMIZER_STEP}{const.OPTIMIZER_SEP}"): - self._add_optimizer(event) - return - - def _add_event_for_op_stack(self, event): - if event.name.lower() == const.TORCH_TO_NPU: - self._add_torch_to_npu(event) - return - if event.args.get(const.CALL_STACKS): - self._add_ops_with_stack(event) - return - - if event.args.get(const.TASK_TYPE) and event.args.get(const.TASK_TYPE) in [const.AI_CORE, const.AI_CPU]: - self._add_ops_with_task_type(event) - return +@singleton +class ScheduleAnalysisDataset(BaseTimelineEventDataset): + collector_map = OrderedDict( + StepCollector=StepCollector(), + MemCollector=MemCollector(), + OpCompileCollector=OpCompileCollector(), + SynchronizeStreamCollector=SynchronizeStreamCollector(), + DataloaderCollector=DataloaderCollector(), + SyncBNCollector=SyncBNCollector(), + AtenCollector=AtenCollector(), + OptimizerCollector=OptimizerCollector() + ) - if event.name and event.ts and event.name == const.ACL_TO_NPU: - self._add_acl_to_npu(event) - return + def __init__(self, collection_path, data: dict, build_dataset=True, **kwargs) -> None: + super().__init__(collection_path, data, build_dataset, **kwargs) + self.aten = None + self.synchronize_stream = None + self._collector_post_process() + self._post_process() def _post_process(self): # eliminate sub aten operator of the first level aten operator by 'ts' and 'dur', # keep the first level aten operator contiguous formated_atens = [] - for event in sorted(self._aten, key=lambda x: x.get("ts", -1)): + if not hasattr(self, "aten") or not hasattr(self, "synchronize_stream"): + return + + for event in sorted(self.aten, key=lambda x: x.get("ts", -1)): if event.name.startswith(const.ATEN): if not formated_atens or not formated_atens[-1].ts_include(event): formated_atens.append(event) elif event.name.startswith(const.SYNC_STREAM): - self._synchronize_stream.update_sync_stream_count() - if formated_atens[-1].ts_include(event): + self.synchronize_stream.update_sync_stream_count() + if formated_atens and formated_atens[-1].ts_include(event): # 使用aten算子的索引,用于查询堆栈 event["dataset_index"] = formated_atens[-1].get("dataset_index") - self._synchronize_stream.append_slow_sync_stream(event) + self.synchronize_stream.append_slow_sync_stream(event) else: continue - self._aten = formated_atens + self.aten = formated_atens + + +class ComputationAnalysisDataset(BaseTimelineEventDataset): + collector_map = OrderedDict( + StepCollector=StepCollector(), + SpecificTaskTypeOpCollector=SpecificTaskTypeOpCollector(), + TorchToNpuCollector=TorchToNpuCollector(), + AclToNpuCollector=AclToNpuCollector(), + OpStackCollector=OpStackCollector(), + FrequencyCollector=FrequencyCollector(), + ) + + def __init__(self, collection_path, data: dict, build_dataset=True, **kwargs) -> None: + super().__init__(collection_path, data, build_dataset, **kwargs) + self._collector_post_process() diff --git a/profiler/advisor/dataset/timeline_op_collector/__init__.py b/profiler/advisor/dataset/timeline_op_collector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/profiler/advisor/dataset/timeline_op_collector/timeline_op_collector.py b/profiler/advisor/dataset/timeline_op_collector/timeline_op_collector.py new file mode 100644 index 000000000..e1337e60c --- /dev/null +++ b/profiler/advisor/dataset/timeline_op_collector/timeline_op_collector.py @@ -0,0 +1,367 @@ +import logging +import os +from abc import abstractmethod, ABCMeta + +from profiler.advisor.common import constant as const +from profiler.advisor.common.timeline.event import TimelineEvent +from profiler.advisor.utils.utils import convert_to_float +from profiler.cluster_analyse.common_func.file_manager import FileManager + +logger = logging.getLogger() + + +class BaseOpCollector(metaclass=ABCMeta): + + def __init__(self): + self.attribute_to_dataset = {} + self.op_list = [] + self.require_filter_by_step = True + + @abstractmethod + def add_op(self): + """ add timeline event into self.op_list, and then will filter event in self.op_list by specific step + """ + pass + + @abstractmethod + def post_process(self): + """ convert self.op_list to required format like dict, set and so on and then record the final object into + self.attribute_to_dataset which used to set property of timeline event dataset + """ + pass + + +class StepCollector(BaseOpCollector): + KEY_WORD = "ProfilerStep" + + def __init__(self): + super().__init__() + self.require_filter_by_step = False + + def add_op(self, event): + if event.name.startswith(self.KEY_WORD): + self.op_list.append(event) + + def post_process(self, *args, **kwargs): + self.attribute_to_dataset["profiler_step"] = self.op_list + + +class OpCompileCollector(BaseOpCollector): + def __init__(self): + super().__init__() + self._total_op_compile_counter = 0 + self._total_op_compile_time = 0.0 + + @property + def total_time(self): + return self._total_op_compile_time + + @property + def total_count(self): + return self._total_op_compile_counter + + def is_empty(self): + return self._total_op_compile_counter == 0 + + def update(self, event: TimelineEvent): + self._total_op_compile_time += float(event.dur) + self._total_op_compile_counter += 1 + + def unset(self): + self._total_op_compile_counter = 0 + self._total_op_compile_time = 0.0 + + def add_op(self, event): + if event.name == const.OP_COMPILE_NAME or event.args.get("id") == const.OP_COMPILE_ID: + self.op_list.append(event) + + def post_process(self, target_op_list, **kwargs): + for op in target_op_list: + self.update(op) + + self.attribute_to_dataset["ops_compile"] = self + + +class SynchronizeStreamCollector(BaseOpCollector): + + def __init__(self): + super().__init__() + self._synchronize_stream_count = 0 + self._slow_synchronize_stream = [] + self.rule = SynchronizeStreamCollector._load_rule() + + @property + def total_count(self): + return self._synchronize_stream_count + + @property + def slow_synchronize_stream(self): + return self._slow_synchronize_stream + + @staticmethod + def _load_rule(): + sync_stream_rule_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))), + "rules", + "synchronize.yaml") + + sync_stream_rule = FileManager.read_yaml_file(sync_stream_rule_path) + return sync_stream_rule + + def update_sync_stream_count(self): + self._synchronize_stream_count += 1 + + def append_slow_sync_stream(self, event): + if float(event.dur) / 1000 >= self.rule.get("slow_synchronize_threshold", 10): + self._slow_synchronize_stream.append(event) + + def unset(self): + self._synchronize_stream_count = 0 + self._slow_synchronize_stream = [] + + def add_op(self, event): + return self.op_list + + def post_process(self, *args, **kwargs): + self.attribute_to_dataset["synchronize_stream"] = self + + +class MemCollector(BaseOpCollector): + MEMORY_OP_NAME = ["AscendCL@aclMallocMemInner", "AscendCL@aclrtFreePhysical"] + + def __init__(self): + super().__init__() + self.mem_op_info = {} + self.rule = self._load_rule() + + @staticmethod + def _load_rule(): + memory_rule_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))), + "rules", + "memory.yaml") + + memory_rule = FileManager.read_yaml_file(memory_rule_path) + return memory_rule + + def add_op(self, event): + if event.name not in self.MEMORY_OP_NAME: + return + self.op_list.append(event) + + def post_process(self, target_op_list, **kwargs): + for op in target_op_list: + if op.name not in self.mem_op_info: + self.mem_op_info[op.name] = dict(count=0, total_dur=0) + self.mem_op_info[op.name]["count"] += 1 + self.mem_op_info[op.name]["total_dur"] += float(op.dur) + + self.attribute_to_dataset["memory_ops"] = self + + +class DataloaderCollector(BaseOpCollector): + key_word = "dataloader" + + def __init__(self): + super().__init__() + + def add_op(self, event): + if self.key_word in event.name.lower(): + self.op_list.append(TimelineEvent({ + "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur, + "stack": event.args.get("Call stack") + })) + + def post_process(self, *args, **kwargs): + self.attribute_to_dataset["dataloader"] = self.op_list + + +class SyncBNCollector(BaseOpCollector): + key_word = "syncbatchnorm" + + def __init__(self): + super().__init__() + + def add_op(self, event): + if event.name.lower() == self.key_word: + self.op_list.append(TimelineEvent({ + "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur + })) + + def post_process(self, target_op_list, **kwargs): + self.attribute_to_dataset["sync_batchnorm"] = target_op_list + + +class AtenCollector(BaseOpCollector): + + def __init__(self): + super().__init__() + + def _add_aten(self, event: TimelineEvent): + self.op_list.append(TimelineEvent({ + "name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur + })) + + def add_op(self, event): + if event.name.lower().startswith(f"{const.ATEN}{const.ATEN_SEP}") or event.name.lower().startswith( + f"{const.NPU}{const.ATEN_SEP}"): + self._add_aten(event) + return + + # 检查cann层同步操作,根据时间窗口索引到host侧的aten算子并给出堆栈 + if event.name.startswith(const.SYNC_STREAM): + self._add_aten(event) + + def post_process(self, target_op_list, **kwargs): + self.attribute_to_dataset["aten"] = target_op_list + + +class OptimizerCollector(BaseOpCollector): + + def __init__(self): + super().__init__() + + def add_op(self, event): + if event.name.startswith(f"{const.OPTIMIZER}.{const.OPTIMIZER_STEP}{const.OPTIMIZER_SEP}"): + self.op_list.append(TimelineEvent( + {"name": event.name, "dataset_index": event.dataset_index, "ts": event.ts, "dur": event.dur})) + + def post_process(self, target_op_list, **kwargs): + self.attribute_to_dataset["optimizer"] = target_op_list + + +class FrequencyCollector(BaseOpCollector): + KEY_WORD = "AI Core Freq" + + def __init__(self): + super().__init__() + self._previous_freq_index = -1 + + @staticmethod + def get_op_frequency(ai_core_ops, ai_core_freq): + ai_core_freq.sort(key=lambda x: float(x.ts)) + op_freq_record = {} + + op_index, freq_index = 0, 0 + while op_index < len(ai_core_ops) and freq_index < len(ai_core_freq): + op_event = ai_core_ops[op_index] + # if isinstance(op_event.ts, dict) or isinstance(op_event.ts, dur): + # print(op_event) + # raise + op_end_time = convert_to_float(op_event.ts) + convert_to_float(op_event.dur) + op_freq_list = [] + while freq_index < len(ai_core_freq): + freq_event = ai_core_freq[freq_index] + if convert_to_float(freq_event.end) < op_end_time: + op_freq_list.append(convert_to_float(freq_event.args.MHz)) + freq_index += 1 + continue + elif convert_to_float(freq_event.ts) < op_end_time: + if op_event.name not in op_freq_record: + op_freq_record[op_event.name] = {"count": 0, "dur": 0, "freq_list": []} + op_freq_record[op_event.name]["count"] += 1 + op_freq_record[op_event.name]["dur"] += convert_to_float(op_event.dur) + op_freq_list.append(convert_to_float(freq_event.args.MHz)) + op_freq_record[op_event.name]["freq_list"].append(min(op_freq_list)) + break + else: + break + + op_index += 1 + return op_freq_record + + def add_op(self, event): + import math + if event.name == self.KEY_WORD: + if self._previous_freq_index != -1: + self.op_list[self._previous_freq_index]["end"] = event.get("ts", float(math.inf)) + self._previous_freq_index += 1 + event.setdefault("end", float(math.inf)) + self.op_list.append(event) + + def post_process(self, target_op_list, **kwargs): + + ai_core_ops = kwargs.get("ai_core_ops", []) + if not ai_core_ops: + return + ai_core_ops.sort(key=lambda x: float(x.ts)) + op_freq = FrequencyCollector.get_op_frequency(ai_core_ops, target_op_list) + self.attribute_to_dataset["op_freq"] = op_freq + + +class SpecificTaskTypeOpCollector(BaseOpCollector): + + def __init__(self, op_type_list=[const.AI_CPU, const.AI_CORE, const.MIX_AIC]): + super().__init__() + self.op_type_list = op_type_list + + def add_op(self, event): + if event.args.get(const.TASK_TYPE) and event.args.get(const.TASK_TYPE) in self.op_type_list: + self.op_list.append( + TimelineEvent( + { + const.TASK_TYPE: event.args.get(const.TASK_TYPE), + "task_id": event.args.get("Task Id"), + "tid": event.tid, + "name": event.name, + "ts": str(event.ts), + "dur": str(event.dur) + } + ) + ) + + def post_process(self, target_op_list, **kwargs): + op_map = dict() + for op in target_op_list: + key = f"{op.name}-{op.ts}" + op_map[key] = op + + self.attribute_to_dataset["ops_with_task_type"] = op_map + self.attribute_to_dataset["task_op_names"] = list( + set([event_key.split("-")[0] for event_key in op_map.keys()])) + + +class TorchToNpuCollector(BaseOpCollector): + def __init__(self): + super().__init__() + + def add_op(self, event): + if event.name.lower() == const.TORCH_TO_NPU: + self.op_list.append(TimelineEvent({"tid": event.tid, "ts": str(event.ts), "ph": event.ph, "id": event.id})) + + def post_process(self, target_op_list, **kwargs): + op_map = dict() + for op in target_op_list: + key = f"{op.ph}-{op.id}" + op_map[key] = op + + self.attribute_to_dataset["torch_to_npu"] = op_map + + +class AclToNpuCollector(BaseOpCollector): + def __init__(self): + super().__init__() + + def add_op(self, event): + if event.name and event.ts and event.name == const.ACL_TO_NPU: + self.op_list.append(TimelineEvent({"ts": event.ts})) + + def post_process(self, target_op_list, **kwargs): + op_record = set(str(op.ts) for op in target_op_list) + self.attribute_to_dataset["acl_to_npu"] = op_record + + +class OpStackCollector(BaseOpCollector): + def __init__(self): + super().__init__() + + def add_op(self, event): + if event.args.get(const.CALL_STACKS): + self.op_list.append( + TimelineEvent({"name": event.name, "dataset_index": event.dataset_index, "ts": event.ts})) + + def post_process(self, target_op_list, **kwargs): + op_map = dict() + for op in target_op_list: + op_map[str(op.ts)] = op + + self.attribute_to_dataset["ops_with_stack"] = op_map diff --git a/profiler/advisor/display/html/templates/affinity_api.html b/profiler/advisor/display/html/templates/affinity_api.html index 2b4fa4532..e9f3dd29c 100644 --- a/profiler/advisor/display/html/templates/affinity_api.html +++ b/profiler/advisor/display/html/templates/affinity_api.html @@ -1,6 +1,6 @@ {% if result|length > 0 %}
-

Affinity API Issues

+

Affinity API Issues

The analysis results of following affinity APIs are based on runtime env cann-{{ cann_version }} diff --git a/profiler/advisor/display/html/templates/ai_core_frequency.html b/profiler/advisor/display/html/templates/ai_core_frequency.html index d04514203..9e5f34cef 100644 --- a/profiler/advisor/display/html/templates/ai_core_frequency.html +++ b/profiler/advisor/display/html/templates/ai_core_frequency.html @@ -1,6 +1,6 @@ {% if data|length > 0 %}
-

AI CORE Frequency Issues

+

AI CORE Frequency Issues

Issue: {{ desc }}
diff --git a/profiler/advisor/display/html/templates/operator_ai_cpu.html b/profiler/advisor/display/html/templates/operator_ai_cpu.html index b3235a880..90f3bb93a 100644 --- a/profiler/advisor/display/html/templates/operator_ai_cpu.html +++ b/profiler/advisor/display/html/templates/operator_ai_cpu.html @@ -1,5 +1,5 @@
-

AICPU Issues

+

AICPU Issues

diff --git a/profiler/advisor/display/html/templates/operator_block_dim.html b/profiler/advisor/display/html/templates/operator_block_dim.html index 4e2c832f6..8079db30a 100644 --- a/profiler/advisor/display/html/templates/operator_block_dim.html +++ b/profiler/advisor/display/html/templates/operator_block_dim.html @@ -1,5 +1,5 @@
-

Block Dim Issues

+

Block Dim Issues

diff --git a/profiler/advisor/display/html/templates/operator_dispatch.html b/profiler/advisor/display/html/templates/operator_dispatch.html index c80508635..7aa69251e 100644 --- a/profiler/advisor/display/html/templates/operator_dispatch.html +++ b/profiler/advisor/display/html/templates/operator_dispatch.html @@ -1,6 +1,6 @@ {% if optimizers|length > 0 %}
-

Operator Dispatch Issues

+

Operator Dispatch Issues

diff --git a/profiler/advisor/display/html/templates/operator_dynamic_shape.html b/profiler/advisor/display/html/templates/operator_dynamic_shape.html index 59920b6c9..9d4ca028b 100644 --- a/profiler/advisor/display/html/templates/operator_dynamic_shape.html +++ b/profiler/advisor/display/html/templates/operator_dynamic_shape.html @@ -1,5 +1,5 @@
-

Operator Dynamic Shape Issues

+

Operator Dynamic Shape Issues

diff --git a/profiler/advisor/display/html/templates/operator_no_bound.html b/profiler/advisor/display/html/templates/operator_no_bound.html index cfbd20baa..d0b8925cb 100644 --- a/profiler/advisor/display/html/templates/operator_no_bound.html +++ b/profiler/advisor/display/html/templates/operator_no_bound.html @@ -1,5 +1,5 @@
-

Operator No Bound Issues

+

Operator No Bound Issues

diff --git a/profiler/advisor/utils/obs_utils.py b/profiler/advisor/utils/obs_utils.py deleted file mode 100644 index 4d5095923..000000000 --- a/profiler/advisor/utils/obs_utils.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import re -import shutil -import logging - -from profiler.advisor.common import constant as const -from profiler.advisor.utils.utils import ParallelJob, singleton - -try: - import moxing as mox - - enable_mox = True -except ModuleNotFoundError: - enable_mox = False - -logger = logging.getLogger() - - -class MoxUtils: - @staticmethod - def data_copy(src_dir, dst_dir, is_dir=False): - if is_dir: - mox.file.copy_parallel(src_dir, dst_dir) - else: - mox.file.copy(src_dir, dst_dir) - - @staticmethod - def check_file_exist(file_path): - return mox.file.exists(file_path) - - @staticmethod - def check_dir_exist(dir_path): - return mox.file.is_directory(dir_path) - - @staticmethod - def list_directory(dir_path, recursive=False): - return mox.file.list_directory(dir_path, recursive) - - -@singleton -class ObsProfilingFileHandler: - - def __init__(self, collection_path): - - self.collection_path = collection_path.rstrip("/") - self.files = MoxUtils.list_directory(collection_path, recursive=True) - self.local_output_dir = os.path.join(os.getenv(const.ANALYSIS_OUTPUT_PATH) or os.getcwd(), - const.ADVISOR_ANALYSIS_OUTPUT_DIR, - "profiling_for_cluster_analysis", - os.path.basename(self.collection_path)) - self._tmp_dir = os.path.join(self.local_output_dir, "tmp") - self._rank_id_local_path = {} - self._rank_id_list = self._get_rank_id_list() - - @property - def rank_ids(self): - return self._rank_id_list - - def download_files_by_pattern(self, file_name_pattern_list): - logger.debug("Start to download profiling file with file name match the pattern %s", file_name_pattern_list) - download_files = self._get_download_file_path(file_name_pattern_list) - - src_file_list = [os.path.join(self.collection_path, file) for file in download_files] - dst_file_list = [os.path.join(self._tmp_dir, file) for file in download_files] - - parallel_download_job = ParallelJob( - MoxUtils.data_copy, - list(zip(src_file_list, dst_file_list)), - "Downloading files for cluster analysis" - ) - - parallel_download_job.start(const.DEFAULT_PROCESSES) - - def download_rank_profiling_files(self, rank_id_list): - logger.debug("Start to download profiling files for rank %s", rank_id_list) - for file in self.files: - for rank_id in rank_id_list: - if os.path.basename(file) != f"profiler_info_{rank_id}.json": - continue - - obs_dir_path = os.path.dirname(os.path.join(self.collection_path, file)) - local_dir_path = os.path.join(self.local_output_dir, os.path.basename(obs_dir_path)) - MoxUtils.data_copy(obs_dir_path, local_dir_path, is_dir=True) - self._rank_id_local_path[rank_id] = local_dir_path - - def get_cluster_analysis_local_path(self): - return self._tmp_dir - - def get_rank_profiling_local_path(self, rank_id): - return self._rank_id_local_path.get(rank_id) - - def remove_tmp_dir(self): - if os.path.exists(self._tmp_dir): - shutil.rmtree(self._tmp_dir) - - def _get_rank_id_list(self): - rank_id_list = [] - for file in self.files: - match = re.search(const.PROFILER_INFO_FILE_PATTERN, file) - if match: - rank_id = int(match.group(1)) - rank_id_list.append(rank_id) - return sorted(rank_id_list) - - def _get_download_file_path(self, file_name_pattern_list): - i, j = 0, 0 - rank_file_collector = [] - while i < len(self.files): - file_path = self.files[i] - if const.ASCEND_PROFILER_OUTPUT not in file_path.split("/"): - i += 1 - continue - - index = file_path.split("/").index(const.ASCEND_PROFILER_OUTPUT) - rank_root_dir = "/".join(file_path.split("/")[:index]) - - j = i - - while True: - - if j >= len(self.files) or not self.files[j].startswith(rank_root_dir): - i = j - break - - for file_name_pattern in file_name_pattern_list: - if re.search(file_name_pattern, self.files[j]): - rank_file_collector.append(self.files[j]) - j += 1 - return rank_file_collector diff --git a/profiler/advisor/utils/utils.py b/profiler/advisor/utils/utils.py index 6d943925f..6445b40e3 100644 --- a/profiler/advisor/utils/utils.py +++ b/profiler/advisor/utils/utils.py @@ -102,7 +102,7 @@ def singleton(cls): for base_class in inspect.getmro(cls)[::-1]: # 获取类的所有成员 members = inspect.getmembers(base_class) - + # 过滤出函数对象 function_objs = [member[1] for member in members if inspect.isfunction(member[1]) or inspect.ismethod(member[1])] for function_obj in function_objs: @@ -610,6 +610,12 @@ def convert_to_float(num): try: return float(num) except (ValueError, FloatingPointError): - logger.error(f"Can not convert %ss to float", num) - pass + logger.error(f"Can not convert %s to float", num) return 0 + + +def safe_index(array, value, return_index_if_error=None): + if value in array: + return array.index(value) + + return return_index_if_error \ No newline at end of file diff --git a/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_memory_op_checker.py b/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_memory_op_checker.py new file mode 100644 index 000000000..cdb59906d --- /dev/null +++ b/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_memory_op_checker.py @@ -0,0 +1,62 @@ +import unittest +import os +import sys +import yaml + +from profiler.advisor.analyzer.memory.memory_checker import MemoryOpsChecker +from profiler.advisor.common.timeline.event import TimelineEvent +from profiler.test.ut.advisor.advisor_backend.tools.tool import recover_env + + +class TestMemOpChecker(unittest.TestCase): + @classmethod + def tearDownClass(cls) -> None: + recover_env() + + def setUp(self) -> None: + rule_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))))), + "advisor", "rules", "memory.yaml") + + with open(rule_path, "rb") as file: + self.rule = yaml.safe_load(file) + + def test_no_mem_op(self): + dataset = self._get_mock_dataset(1, is_empty_dataset=True) + + checker = MemoryOpsChecker() + checker.check_memory_ops(dataset) + self.assertFalse(checker.memory_issues) + + def test_mem_op_not_reach_threshold(self): + dataset = self._get_mock_dataset(1, is_empty_dataset=False) + + checker = MemoryOpsChecker() + checker.check_memory_ops(dataset) + self.assertFalse(checker.memory_issues) + + def test_mem_op_reach_threshold(self): + dataset = self._get_mock_dataset(1, 1000000, is_empty_dataset=False) + + checker = MemoryOpsChecker() + checker.check_memory_ops(dataset) + self.assertTrue(checker.memory_issues) + + def _get_mock_dataset(self, mem_op_num, mem_op_total_dur=10000, is_empty_dataset=False): + dataset = TimelineEvent() + if is_empty_dataset: + return dataset + + mem_op_info = TimelineEvent() + for i in range(mem_op_num): + mem_op_info[f"mock_mem_op_{i}"] = TimelineEvent({"total_dur": mem_op_total_dur, "count": 10}) + + dataset["memory_ops"] = TimelineEvent({"mem_op_info": mem_op_info, "rule": TimelineEvent(self.rule)}) + return dataset + + +if __name__ == '__main__': + tester = TestMemOpChecker() + tester.test_no_mem_op() + tester.test_mem_op_not_reach_threshold() + tester.test_mem_op_reach_threshold() \ No newline at end of file diff --git a/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_timeline_op_collector.py b/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_timeline_op_collector.py new file mode 100644 index 000000000..3e1add689 --- /dev/null +++ b/profiler/test/ut/advisor/advisor_backend/timeline_advice/test_timeline_op_collector.py @@ -0,0 +1,144 @@ +import unittest +import os +import sys +import yaml + +from profiler.advisor.dataset.timeline_op_collector.timeline_op_collector import ( + OpCompileCollector, + SynchronizeStreamCollector, + MemCollector, + DataloaderCollector, + SyncBNCollector, + AtenCollector, + OptimizerCollector, + FrequencyCollector, + SpecificTaskTypeOpCollector, + TorchToNpuCollector, + AclToNpuCollector, + OpStackCollector, + StepCollector +) +from profiler.advisor.common.timeline.event import TimelineEvent +from profiler.test.ut.advisor.advisor_backend.tools.tool import recover_env + + +class TestTimelineOpCollector(unittest.TestCase): + @classmethod + def tearDownClass(cls) -> None: + recover_env() + + def setUp(self) -> None: + self.mock_step_event = TimelineEvent(dict(name="ProfilerStep#1", ts=1, dur=1000)) + self.mock_op_compile_event = TimelineEvent(dict(name="AscendCL@aclopCompileAndExecute", ts=2, dur=1)) + self.mock_sync_stream_event = TimelineEvent(dict(name="AscendCL@aclrtSynchronizeStream", dur=1000000000)) + self.mock_mem_op_event = TimelineEvent(dict(name="AscendCL@aclMallocMemInner", dur=10)) + self.mock_dataloader_event = TimelineEvent(dict(name="dataloader")) + self.mock_sync_bn_event = TimelineEvent(dict(name="syncbatchnorm")) + self.mock_aten_event = TimelineEvent(dict(name="aten::conv3d")) + self.mock_optimizer_event = TimelineEvent(dict(name="Optimizer.step#")) + self.mock_AI_CPU_event = TimelineEvent( + {"name": "index", "args": TimelineEvent({"Task Type": "AI_CPU"}), "ts": 1}) + self.mock_torch_to_npu_event = TimelineEvent(dict(name="torch_to_npu", tid=1, ts=1, ph=1, id=1)) + self.mock_acl_to_npu_event = TimelineEvent(dict(name="acl_to_npu", ts=1)) + self.mock_op_stack_event = TimelineEvent( + {"name": "aten::conv3d", "dataset_index": 1, "ts": 1, "args": TimelineEvent({"Call stack": "mock_stack"})}) + + def test_step_collector(self): + step_collector = StepCollector() + step_collector.add_op(self.mock_step_event) + step_collector.post_process() + self.assertEqual(step_collector.attribute_to_dataset.get("profiler_step"), [self.mock_step_event]) + + def test_op_compile_collector(self): + op_compile_collector = OpCompileCollector() + op_compile_collector.add_op(self.mock_op_compile_event) + op_compile_collector.post_process(op_compile_collector.op_list) + self.assertEqual(op_compile_collector.attribute_to_dataset.get("ops_compile"), op_compile_collector) + self.assertEqual(op_compile_collector.total_time, 1) + self.assertEqual(op_compile_collector.total_count, 1) + + def test_sync_stream_collector(self): + sync_stream_collector = SynchronizeStreamCollector() + sync_stream_collector.post_process() + self.assertEqual(sync_stream_collector.attribute_to_dataset.get("synchronize_stream"), sync_stream_collector) + + sync_stream_collector.append_slow_sync_stream(self.mock_sync_stream_event) + self.assertEqual(len(sync_stream_collector._slow_synchronize_stream), 1) + + def test_mem_op_collector(self): + mem_op_collector = MemCollector() + mem_op_collector.add_op(self.mock_mem_op_event) + mem_op_collector.post_process(mem_op_collector.op_list) + self.assertEqual(mem_op_collector.attribute_to_dataset.get("memory_ops"), mem_op_collector) + self.assertEqual(mem_op_collector.mem_op_info.get("AscendCL@aclMallocMemInner"), {"count": 1, "total_dur": 10}) + + def test_dataloader_collector(self): + dataloader_collector = DataloaderCollector() + dataloader_collector.add_op(self.mock_dataloader_event) + dataloader_collector.post_process() + self.assertEqual(len(dataloader_collector.attribute_to_dataset.get("dataloader")), 1) + + def test_sync_bn_collector(self): + sync_bn_collector = SyncBNCollector() + sync_bn_collector.add_op(self.mock_sync_bn_event) + sync_bn_collector.post_process(sync_bn_collector.op_list) + self.assertEqual(len(sync_bn_collector.attribute_to_dataset.get("sync_batchnorm")), 1) + + def test_aten_collector(self): + aten_collector = AtenCollector() + aten_collector.add_op(self.mock_aten_event) + aten_collector.add_op(self.mock_sync_stream_event) + aten_collector.post_process(aten_collector.op_list) + self.assertEqual(len(aten_collector.attribute_to_dataset.get("aten")), 2) + + def test_optimizer_collector(self): + optimizer_collector = OptimizerCollector() + optimizer_collector.add_op(self.mock_optimizer_event) + optimizer_collector.post_process(optimizer_collector.op_list) + self.assertEqual(len(optimizer_collector.attribute_to_dataset.get("optimizer")), 1) + + def test_specific_task_type_op_collector(self): + specific_task_type_op_collector = SpecificTaskTypeOpCollector() + specific_task_type_op_collector.add_op(self.mock_AI_CPU_event) + specific_task_type_op_collector.post_process(specific_task_type_op_collector.op_list) + key = f"{self.mock_AI_CPU_event.name}-{self.mock_AI_CPU_event.ts}" + self.assertTrue( + specific_task_type_op_collector.attribute_to_dataset.get("ops_with_task_type", {}).get(key)) + self.assertTrue(specific_task_type_op_collector.attribute_to_dataset.get("task_op_names"), [key]) + + def test_torch_to_npu_collector(self): + torch_to_npu_collector = TorchToNpuCollector() + torch_to_npu_collector.add_op(self.mock_torch_to_npu_event) + torch_to_npu_collector.post_process(torch_to_npu_collector.op_list) + key = f"{self.mock_torch_to_npu_event.ph}-{self.mock_torch_to_npu_event.id}" + self.assertTrue("1-1" in torch_to_npu_collector.attribute_to_dataset.get("torch_to_npu")) + + def test_acl_to_npu_collector(self): + acl_to_npu_collector = AclToNpuCollector() + acl_to_npu_collector.add_op(self.mock_acl_to_npu_event) + acl_to_npu_collector.post_process(acl_to_npu_collector.op_list) + self.assertEqual(acl_to_npu_collector.attribute_to_dataset.get("acl_to_npu"), + set([str(self.mock_acl_to_npu_event.ts)])) + + def test_op_stack_collector(self): + op_stack_collector = OpStackCollector() + op_stack_collector.add_op(self.mock_op_stack_event) + op_stack_collector.post_process(op_stack_collector.op_list) + self.assertTrue( + str(self.mock_op_stack_event.ts) in op_stack_collector.attribute_to_dataset.get("ops_with_stack")) + + +if __name__ == '__main__': + tester = TestTimelineOpCollector() + tester.test_step_collector() + tester.test_op_compile_collector() + tester.test_sync_stream_collector() + tester.test_mem_op_collector() + tester.test_dataloader_collector() + tester.test_sync_bn_collector() + tester.test_aten_collector() + tester.test_optimizer_collector() + tester.test_specific_task_type_op_collector() + tester.test_torch_to_npu_collector() + tester.test_acl_to_npu_collector() + tester.test_op_stack_collector() -- Gitee