From 2d63996b8b86827a0d3ed77d90f5ce2b2168f7fb Mon Sep 17 00:00:00 2001 From: fanxiaotong Date: Thu, 23 May 2024 10:21:08 +0800 Subject: [PATCH] merge --- .../advisor/analyzer/computation/__init__.py | 0 .../analyzer/computation/aicpu/__init__.py | 0 .../computation/aicpu/aicpu_checker.py | 278 +++++++++++++++++ .../analyzer/computation/bound/__init__.py | 0 .../computation/bound/block_dim_checker.py | 77 +++++ .../bound/operator_bound_checker.py | 56 ++++ .../computation/op_compile/__init__.py | 0 .../op_compile/dynamic_shape_checker.py | 91 ++++++ .../analyzer/computation/operator_checker.py | 282 ++++++++++++++++++ .../computation/profiling_analyzer.py | 71 +++++ .../advisor/analyzer/dataloader/__init__.py | 0 11 files changed, 855 insertions(+) create mode 100644 profiler/advisor/analyzer/computation/__init__.py create mode 100644 profiler/advisor/analyzer/computation/aicpu/__init__.py create mode 100644 profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py create mode 100644 profiler/advisor/analyzer/computation/bound/__init__.py create mode 100644 profiler/advisor/analyzer/computation/bound/block_dim_checker.py create mode 100644 profiler/advisor/analyzer/computation/bound/operator_bound_checker.py create mode 100644 profiler/advisor/analyzer/computation/op_compile/__init__.py create mode 100644 profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py create mode 100644 profiler/advisor/analyzer/computation/operator_checker.py create mode 100644 profiler/advisor/analyzer/computation/profiling_analyzer.py create mode 100644 profiler/advisor/analyzer/dataloader/__init__.py diff --git a/profiler/advisor/analyzer/computation/__init__.py b/profiler/advisor/analyzer/computation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/profiler/advisor/analyzer/computation/aicpu/__init__.py b/profiler/advisor/analyzer/computation/aicpu/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py new file mode 100644 index 000000000..4eca1c6c0 --- /dev/null +++ b/profiler/advisor/analyzer/computation/aicpu/aicpu_checker.py @@ -0,0 +1,278 @@ +import copy +import os +from functools import partial +from typing import List, Dict, Optional + +import yaml +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker, logger +from profiler.advisor.analyzer.schedule.fusion_ops.timeline_api_stack_checker import OpStackFinder +from profiler.advisor.common import constant +from profiler.advisor.dataset.dataset import Dataset +from profiler.advisor.dataset.profiling.profiling_dataset import ProfilingDataset +from profiler.advisor.dataset.timeline_event_dataset import TimelineEventDataset + + +class AicpuChecker(OperatorChecker): + _CHECKER = "aicpu operator" + _PROBLEM = "AICPU operator" + _MIN_TASK_DURATION = 20 + _description = f"Some operators and task duration exceed {_MIN_TASK_DURATION} us, such as :\n" + _SUGGESTION: List[str] = ["Modify code to avoid aicpu operator"] + STACK_INFO_ITEMS = "stack_info" + SUGGESTION_INFO_ITEMS = "suggestions" + _ITEMS = [ + "op_name", "op_type", "task_duration", "input_shapes", "input_data_types", "input_formats", "output_shapes", + "output_data_types", "output_formats" + ] + + def __init__(self, cann_version): + super(AicpuChecker, self).__init__(cann_version=cann_version) + self.aicpu_rules: Dict = {} + self.aicpu_checker: Dict = {} + self.load_aicpu_rules() + + def _check_data(self, profiling_data: ProfilingDataset) -> bool: + if not self._check_summary(profiling_data): + return False + return True + + def _check_operator(self, op_info) -> bool: + return op_info.task_type == constant.AI_CPU + + def load_aicpu_rules(self, rule_path="rules/aicpu_rules.yaml") -> Dict: + if not os.path.isabs(rule_path): + rule_path = os.path.join(os.path.dirname(__file__), + "../../../", rule_path) + + if not os.path.exists(rule_path): + logger.warning("Skip analyze aicpu issues, because %s does not exist.", rule_path) + return {} + with open(rule_path, 'r') as f: + self.aicpu_rules = yaml.safe_load(f) + self.filter_aicpu_rules(self.aicpu_rules) + for checker_name, check_rule in self.aicpu_rules.items(): + if not isinstance(check_rule, (list, dict,)): + continue + + if checker_name not in AICPU_CHECKER.keys(): + logger.warning("Skip %s, which is not support now.", checker_name) + continue + + self.aicpu_checker[checker_name] = AICPU_CHECKER[checker_name](check_rule) + + def filter_aicpu_rules(self, aicpu_rules): + support_checkers = [] + for checkers in aicpu_rules['CommonChecker']: + for key, value in checkers.items(): + if key == 'DataTypeChecker' and self.cann_version in value['cann_version']: + support_checkers.append(checkers) + aicpu_rules['CommonChecker'] = support_checkers + return + + def check_aicpu_attr(self, op_info) -> List[str]: + suggestions = [] + for _, checker in self.aicpu_checker.items(): + suggestions.extend(checker.check(op_info)) + return suggestions + + def check(self, profiling_data: ProfilingDataset) -> bool: + """ + check if any operator need optimize + :param profiling_data: profiling datasest + :return: true or false + """ + + if not self._check_data(profiling_data): + return False + op_summary = profiling_data.op_summary + + def get_opeartor_stack_info(api_stack_finder: OpStackFinder, op_name_list: list) -> list: + data: Dict[str, Dataset] = {} + event_dataset = TimelineEventDataset(collection_path=profiling_data.collection_path, data=data, task_type=constant.AI_CPU) + + # disable multiprocessing, avoid cost time of enable new process for light task + api_stack_finder.get_api_stack_by_op(event_dataset, op_name_list, constant.AI_CPU, + disable_multiprocess=True) + return api_stack_finder._stack_record + + self._op_list = [] + total_task_duration = 0.0 + max_task_duration = 0.0 + for op_info in op_summary.op_list: + if self._check_operator(op_info): + self._op_list.append(op_info) + + task_duration = float(op_info.task_duration) + total_task_duration += task_duration + max_task_duration = max(max_task_duration, task_duration) + if (not self._op_list) or (max_task_duration < self._MIN_TASK_DURATION): + return False + + # 获取所有算子堆栈的信息 + op_name_list = [] + for op in self._op_list: + if op.op_name not in op_name_list: + op_name_list.append(op.op_name) + api_stack_finder = OpStackFinder() + stack_record = get_opeartor_stack_info(api_stack_finder, op_name_list) + + # task_id 到 stack 信息的对应 + self._op_list.sort(key=lambda x: int(x.task_id)) + stack_record.sort(key=lambda x: x[0]) + task_id_to_stack = dict() + for stack in stack_record: + task_id_to_stack[stack[0]] = stack[-1] + + # 算子追加堆栈属性 + for op in self._op_list: + stack = task_id_to_stack.get(int(op.task_id)) + op.add_attr(self.STACK_INFO_ITEMS, stack) + suggestions = self.check_aicpu_attr(op) + op.add_attr(self.SUGGESTION_INFO_ITEMS, suggestions) + + # double 类型算子判断 + double_type_ai_cpu_operator = [] + for op in self._op_list: + if not op.has_attr("input_data_types"): + logger.warning( + "Skip checking of input data in AICPU checker because of not containing input_data_dtypes in op summary") + break + if op.has_attr( + "input_data_types") and "DOUBLE" in op.input_data_types and op.op_name not in double_type_ai_cpu_operator: + double_type_ai_cpu_operator.append(op.op_name) + if bool(double_type_ai_cpu_operator): + self._SUGGESTION.append("Try to convert double type operator to float, such as {}".format( + ",".join(double_type_ai_cpu_operator))) + return True + + def make_render(self, html_render, record): + html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_ai_cpu.html", + format_result=self.format_operator_result(record, constant.OPERATOR_LIST_UNLIMIT)) + + def format_operator_result(self, record, limit): + """ + Format operator result to html + :param record: profiling check record + :param limit: Limit number of operator statistics lists. + :return: + """ + optimization_item = record.optimization_item + release_suggestion_list = [] + for suggestion in optimization_item.suggestion: + release_suggestion_list.append(suggestion.replace('\n', '
')) + logger.debug("suggestion list is %s", release_suggestion_list) + format_result = {"record": record.__dict__, "suggestion": '
'.join(release_suggestion_list), + "task_duration": round(record.statistics_item.task_duration, 2)} + + statistic = self.group_by(copy.deepcopy(self._op_list), op_key='op_type', + limit=limit) + format_result["statistic"] = statistic + stack_key_list = ["stack_info", "input_data_types", "output_data_types"] + if statistic: + for key, info in statistic: + op_info_list = self.group_by_list(info.get("op_info_list"), stack_key_list, limit) + info["op_info_list"] = op_info_list + return format_result + + def group_by_list(self, op_list, op_key_list: List = ["stack_info", "input_data_types", "output_data_types"], + limit: int = constant.OPERATOR_LIST_UNLIMIT): + if op_list is None: + op_list = [] + + # op_key_list 合并添加合并的属性,作为 groupby 的 key value + op_key = '+'.join(op_key_list) # str, json + for op_info in op_list: + attribute = "" + for _op in op_key_list: + if op_info.get_attr(_op): + attribute += op_info.get_attr(_op) + op_info.add_attr(op_key, attribute) + + return self.group_by(op_list, op_key=op_key, limit=limit) + + +class BaserChecker: + def __init__(self, *args, **kwargs): + self.checker_list = [] + + def build(self): + raise NotImplementedError + + def check(self, op_info) -> List[str]: + suggestions = [] + for checker in self.checker_list: + suggestion = checker(op_info) + if suggestion is not None: + suggestions.append(suggestion) + return suggestions + + +class CommonChecker(BaserChecker): + def __init__(self, check_rules: List[Dict] = None): + super(CommonChecker, self).__init__() + self.check_rules = check_rules if check_rules is not None else [] + self.supported_checker = dict(DataTypeChecker=self.datatype_checker) + self.build() + + @staticmethod + def datatype_checker(check_item: Dict, op_info) -> Optional[str]: + supported_op_type = check_item.get('op_type', []) + suggestion = check_item.get('suggestion', "") + valid_inputs = check_item.get('input', []) + valid_outputs = check_item.get('output', []) + ignore_type = check_item.get('ignore_type', []) + op_type = getattr(op_info, 'op_type', "UNKNOWN") + if "__ALL__" in supported_op_type or \ + op_type.lower() in supported_op_type: + if op_type.lower() in ignore_type: + return None + + op_input_dtype = getattr(op_info, 'input_data_types', "").split(";") + op_input_dtype = [item.lower() for item in op_input_dtype] + op_output_dtype = getattr(op_info, 'output_data_types', "").split(";") + op_output_dtype = [item.lower() for item in op_output_dtype] + input_dtype_diff = set(op_input_dtype).difference(set(valid_inputs)) + output_dtype_diff = set(op_output_dtype).difference(set(valid_outputs)) + unsupported_dtype_diff = input_dtype_diff.union(output_dtype_diff) + if not unsupported_dtype_diff: + return None + + return suggestion.format(",".join(unsupported_dtype_diff).upper(), + op_type, + ",".join(valid_inputs).upper()) + + def build(self): + for check in self.check_rules: + (check_func, check_rule), = check.items() + if check_func not in self.supported_checker: + logger.warning("Skip %s, which has not been implemented.", check_func) + continue + self.checker_list.append(partial(self.supported_checker.get(check_func), check_rule)) + + +class ExampleGuideChecker(BaserChecker): + def __init__(self, check_rules: List[Dict] = None): + super(ExampleGuideChecker, self).__init__() + self.check_rules = check_rules if check_rules is not None else [] + self.build() + + def build(self): + def _guide_url(check_item: Dict, op_info) -> Optional[str]: + supported_op_type = check_item.get('op_type', []) + url = check_item.get('url', "") + suggestion = check_item.get('suggestion', "") + + if getattr(op_info, 'op_type', "UNKNOWN").lower() in supported_op_type: + return suggestion if "{}" not in suggestion else suggestion.format(url) + + for check in self.check_rules: + (_, check_rule), = check.items() + self.checker_list.append(partial(_guide_url, check_rule)) + + +AICPU_CHECKER = { + "CommonChecker": CommonChecker, + "ExampleGuideChecker": ExampleGuideChecker +} diff --git a/profiler/advisor/analyzer/computation/bound/__init__.py b/profiler/advisor/analyzer/computation/bound/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/profiler/advisor/analyzer/computation/bound/block_dim_checker.py b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py new file mode 100644 index 000000000..d90ef56c7 --- /dev/null +++ b/profiler/advisor/analyzer/computation/bound/block_dim_checker.py @@ -0,0 +1,77 @@ +import logging + +from typing import List + +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker +from profiler.advisor.common import constant +from profiler.advisor.config.config import Config +from profiler.advisor.dataset.profiling.profiling_dataset import ProfilingDataset + +logger = logging.getLogger() + + +class BlockDimChecker(OperatorChecker): + _SUGGESTION: List[str] = [] + _CHECKER = "block dim" + _PROBLEM = "block dim" + _description = "some operator does not make full use of {} ai core" + _ITEMS = [ + "op_name", "op_type", "task_type", "task_duration", "income", "block_dim", "mix_block_dim", "input_shapes", + "input_data_types", "input_formats", "output_shapes", "output_data_types", "output_formats" + ] + + def _check_data(self, data): + if not self._check_summary(data): + return False + if not Config().get_config("ai_core_num"): + logger.warning(self.SKIP_CHECK_MSG, self._CHECKER, "ai core num in info.json file") + return False + summary = data.op_summary + op_info = summary.op_list[0] + if not hasattr(op_info, "block_dim"): + logger.warning(self.SKIP_CHECK_MSG, self._CHECKER, "block dim in op summary") + return False + if Config().get_config("ai_core_num"): + self._aicore_num = int(Config().get_config("ai_core_num")) + if Config().get_config("aiv_num"): + self._aiv_num = int(Config().get_config("aiv_num")) + self._description = self._description.format(self._aicore_num) + if self._aiv_num: + self._description += f" or {self._aiv_num} ai vector core" + self._description += f";\n Top-{OperatorChecker._MAX_TUNE_OP_NUM} operator of " \ + "task duration are as follows:\n" + return True + + def make_render(self, html_render, record): + html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_block_dim.html", + format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK)) + + def _check_operator(self, op_info) -> bool: + if op_info.task_type not in ["AI_CORE", "AI_VECTOR_CORE", "MIX_AIC"]: + return False + block_dim = int(op_info.block_dim) + core_num = self.get_core_num(op_info) + if block_dim % core_num == 0: + return False + if op_info.task_type == "MIX_AIC" and hasattr(op_info, "mix_block_dim") \ + and self._aiv_num and int(op_info.mix_block_dim) % self._aiv_num == 0: + return False + return True + + def get_core_num(self, op_info): + """ + get core num of task type + """ + if op_info.task_type == "AI_CORE" or not self._aiv_num: + core_num = self._aicore_num + else: + core_num = self._aiv_num + return core_num + + def format_suggestion_content(self, profiling_data: ProfilingDataset) -> None: + if profiling_data.PROF_TYPE == constant.ASCEND_PYTORCH_PROFILER: + self._SUGGESTION.append(self.PyTorch_OPERATOR_TUNE_SUGGESTION) + elif profiling_data.PROF_TYPE == constant.MSLITE: + self._SUGGESTION.append(self.MSLite_OPERATOR_TUNE_SUGGESTION) diff --git a/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py new file mode 100644 index 000000000..4ede3c94e --- /dev/null +++ b/profiler/advisor/analyzer/computation/bound/operator_bound_checker.py @@ -0,0 +1,56 @@ +import logging +from typing import List + +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker +from profiler.advisor.common import constant +from profiler.advisor.config.config import Config +from profiler.advisor.dataset.profiling.profiling_dataset import ProfilingDataset +from profiler.advisor.utils.utils import to_percent + +logger = logging.getLogger() + + +class OperatorBoundChecker(OperatorChecker): + _MIN_TASK_DURATION = 20 # min task duration 20us + _CHECKER = "operator no bound" + _PROBLEM = "operator no bound" + _SUGGESTION: List[str] = [] + _description = ( + f"There is no mte, cube, vector, scalar ratio is more than {to_percent(Config().operator_bound_ratio)};\n" + + f"Top task duration operators need to be tuned are as follows: \n") + _ITEMS = [ + "op_name", "op_type", "task_type", "task_duration", "vec_ratio", "mac_ratio", "scalar_ratio", "mte1_ratio", + "mte2_ratio", "mte3_ratio", "block_dim", "input_shapes", "input_data_types", "input_formats", "output_shapes", + "output_data_types", "output_formats" + ] + + def _check_data(self, data): + if not self._check_summary(data): + return False + for op_info in data.op_summary.op_list: + if self._check_operator(op_info): + return True + + logger.warning(self.SKIP_CHECK_MSG, self._CHECKER, "ratio in op summary") + return False + + def _check_operator(self, op_info) -> bool: + bound_list = ["vec_ratio", "mac_ratio", "scalar_ratio", "mte1_ratio", "mte2_ratio", "mte3_ratio"] + ratio_list = [self.get_ratio(op_info, attr) for attr in bound_list] + if not any(ratio_list): + return False # no data, skip check + if any(ratio and ratio > Config().operator_bound_ratio for ratio in ratio_list): + return False + return True + + def make_render(self, html_render, record): + html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_no_bound.html", + format_result=self.format_operator_result(record, constant.OPERATOR_OUT_TOPK)) + + def format_suggestion_content(self, profiling_data: ProfilingDataset) -> None: + if profiling_data.PROF_TYPE == constant.ASCEND_PYTORCH_PROFILER: + self._SUGGESTION.append(self.PyTorch_OPERATOR_TUNE_SUGGESTION) + elif profiling_data.PROF_TYPE == constant.MSLITE: + self._SUGGESTION.append(self.MSLite_OPERATOR_TUNE_SUGGESTION) diff --git a/profiler/advisor/analyzer/computation/op_compile/__init__.py b/profiler/advisor/analyzer/computation/op_compile/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py new file mode 100644 index 000000000..070b3a3b5 --- /dev/null +++ b/profiler/advisor/analyzer/computation/op_compile/dynamic_shape_checker.py @@ -0,0 +1,91 @@ +import copy +import logging +from typing import List + +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker +from profiler.advisor.common import constant +from profiler.advisor.dataset.profiling.info_collection import OpInfo +from profiler.advisor.result.item import OptimizeItem, StatisticsItem, OptimizeRecord + +logger = logging.getLogger() + + +class DynamicShapeChecker(OperatorChecker): + ENABLE_COMPILED_SUGGESTION = "Optimize by enabling compiled operator, such as:\n" \ + "`torch_npu.npu.set_compile_mode(jit_compile=False)`\n" + _SUGGESTION: List[str] = [ENABLE_COMPILED_SUGGESTION] + _CHECKER = "dynamic shape operator" + _PROBLEM = "Dynamic shape operator" + _description = f"Found all operators are dynamic shape" + _op_list: List[OpInfo] = [] + _tune_op_list: List[str] = [] # record op name to be tuned, and save to tune_ops_file.cfg + _op_views: List = [] + + def __init__(self, cann_version) -> None: + super().__init__(cann_version=cann_version) + + def check(self, profiling_database) -> bool: + less_than_cann800_list = [constant.CANN_VERSION_C30, constant.CANN_VERSION_C13, constant.CANN_VERSION_C15] + # CANN 8.0.0 之前从 ge_info 中获取 op_state 属性,进行动态 shape 逻辑判断 + if self.cann_version in less_than_cann800_list: + if hasattr(profiling_database, "ge_info"): + ge_info = profiling_database.ge_info + static_shape_operators = ge_info.get_static_shape_operators() + if len(static_shape_operators) == 0: + OperatorChecker.IS_ALL_OPERATOR_DYNAMIC_SHAPE = True + return True + else: + logger.warning( + "Skip dynamic shape checker because of not containing ge_info.db file in host filefloder.\n" + "To enable dynamic shape checker, please try to set data_simplification=False in experimental_config.\n" + "More details please refer to link : %s", constant.ASCEND_PROFILER_URL) + else: + # CANN 8.0.0 之后 op_state 属性从 op_summary 文件中获取 + if hasattr(profiling_database, "op_summary"): + static_shape_operators = profiling_database.op_summary.get_static_shape_operators() + if len(static_shape_operators) == 0: + OperatorChecker.IS_ALL_OPERATOR_DYNAMIC_SHAPE = True + return True + else: + logger.warning( + "Skip dynamic shape checker because of not containing op_summary.csv file in current filefloder." + ) + + return False + + def make_record(self, profiling_database) -> OptimizeRecord: + """ + make record for what and how to optimize + """ + + optimization_item = OptimizeItem( + self._PROBLEM, + self._description, + self._SUGGESTION + ) + statistics_item = StatisticsItem("", "", 1) + return OptimizeRecord(optimization_item, statistics_item) + + def format_operator_result(self, record, limit=-1): + """ + Format operator result to html + :param record: profiling check record + :param limit: Limit number of operator statistics lists. + :return: + """ + optimization_item = record.optimization_item + release_suggestion_list = [] + for suggestion in optimization_item.suggestion: + release_suggestion = copy.deepcopy(suggestion) + if release_suggestion == DynamicShapeChecker.ENABLE_COMPILED_SUGGESTION: + release_suggestion += \ + f"for details please refer to link : LINK" + release_suggestion_list.append(release_suggestion.replace('\n', '
')) + format_result = {"record": record.__dict__, "suggestion": '
'.join(release_suggestion_list)} + return format_result + + def make_render(self, html_render, record): + html_render.render_template(key="computation", + template_dir="templates", + template_name="operator_dynamic_shape.html", + format_result=self.format_operator_result(record)) diff --git a/profiler/advisor/analyzer/computation/operator_checker.py b/profiler/advisor/analyzer/computation/operator_checker.py new file mode 100644 index 000000000..6bb837004 --- /dev/null +++ b/profiler/advisor/analyzer/computation/operator_checker.py @@ -0,0 +1,282 @@ +import copy +import logging +from textwrap import fill +from typing import List + +from profiler.advisor.common import constant +from profiler.advisor.common.version_control import VersionControl +from profiler.advisor.config.config import Config +from profiler.advisor.dataset.profiling.info_collection import OpInfo +from profiler.advisor.dataset.profiling.profiling_dataset import ProfilingDataset +from profiler.advisor.result.item import OptimizeItem, StatisticsItem, OptimizeRecord +from profiler.advisor.utils.utils import safe_division + +logger = logging.getLogger() + + +class OperatorChecker(VersionControl): + _SUPPORT_VERSIONS = [constant.CANN_VERSION_C30, constant.CANN_VERSION_C13, constant.CANN_VERSION_C15, constant.CANN_VERSION_C17] + IS_ALL_OPERATOR_DYNAMIC_SHAPE = False + _MAX_TUNE_OP_NUM = constant.OPERATOR_OUT_TOPK + _MIN_TASK_DURATION = 0 + _MIN_TASK_DURATION_RATIO = 1.0 + _MIN_TOTAL_DURATION_RATIO = 1.0 + _CHECKER = str() + _PROBLEM = str() + _description = str() + STACK_INFO_ITEMS = "" + _ITEMS: List[str] = [] + _SUGGESTION: List[str] = [] + SKIP_CHECK_MSG = "Skip %s checker because of not containing %s" + _tune_op_info_list: List[OpInfo] = [] + PyTorch_OPERATOR_TUNE_SUGGESTION = f"Optimize operator by AOE, such as:\n" \ + f"'aoe --job_type=2 --model_path=$user_dump_path " \ + f"--tune_ops_file={Config().tune_ops_file}'\n" + MSLite_OPERATOR_TUNE_SUGGESTION = f"Optimize operator by AOE in mindspore lite framework, such as:\n" \ + f"converter_lite --fmk=ONNX --optimize=ascend_oriented --saveType=MINDIR " \ + f"--modelFile=$user_model.onnx --outputFile=user_model --configFile=./config.txt\n" + _tune_op_list: List[str] = [] + + def __init__(self, cann_version: str): + self.cann_version = cann_version + self._op_list: List[OpInfo] = [] + + def check(self, profiling_data: ProfilingDataset) -> bool: + """ + check if any operator need optimize + :param profiling_data: profiling datasest + :return: true or false + """ + if not self._check_data(profiling_data): + return False + + summary = profiling_data.op_summary + total_task_duration = 0.0 + max_task_duration = 0.0 + for op_info in summary.op_list: + if not self._check_operator(op_info): + continue + task_duration = float(op_info.task_duration) + total_task_duration += task_duration + max_task_duration = max(max_task_duration, task_duration) + self._op_list.append(op_info) + if task_duration > self._MIN_TASK_DURATION: + self._tune_op_info_list.append(op_info) + + if any([ + max_task_duration > self._MIN_TASK_DURATION, + round(safe_division(max_task_duration, summary.get_total_task_duration()), + 4) > self._MIN_TASK_DURATION_RATIO, + round(safe_division(total_task_duration, summary.get_total_task_duration()), 4) > + self._MIN_TOTAL_DURATION_RATIO, + ]): + self._op_list.sort(key=lambda x: float(x.get_attr("task_duration")), reverse=True) + self._tune_op_info_list.sort(key=lambda x: float(x.get_attr("task_duration")), reverse=True) + for op in self._op_list: + if op.op_name not in self._tune_op_list and len(self._tune_op_list) < constant.OPERATOR_OUT_TOPK: + self._tune_op_list.append(op.op_name) + return True + return False + + def make_record(self, profiling_data: ProfilingDataset): + """ + Make record for what and how to optimize + :param profiling_data: profiling data + :return: optimize record + """ + task_duration_list = [float(op_info.get_attr("task_duration")) for op_info in self._op_list if + hasattr(op_info, "get_attr")] + total_cost_time = sum(task_duration_list) + total_task_duration = profiling_data.op_summary.get_total_task_duration() + count = len(task_duration_list) + statistics_item = StatisticsItem(total_task_duration, total_cost_time, count, self.get_incomes()) + optimization_item = OptimizeItem( + self._PROBLEM, + self._get_description(self._description, self.get_op_type_list(self._op_list)[:self._MAX_TUNE_OP_NUM]), + self._SUGGESTION + ) + return OptimizeRecord(optimization_item, statistics_item) + + def _get_description(self, description, op_type_list=None): + if not op_type_list: + return description + + desc_suffix = [] + for i in range(len(op_type_list)): + if i % 3 == 0 and i != 0: + desc_suffix.append("\n") + + desc_suffix.append(f"{op_type_list[i]}") + + if i < len(op_type_list) - 1: + desc_suffix.append(", ") + + description += "".join(desc_suffix) + return description + + def pre_check(self, profiling_data) -> bool: + self.format_suggestion_content(profiling_data) + return not (OperatorChecker.IS_ALL_OPERATOR_DYNAMIC_SHAPE and ( + OperatorChecker.PyTorch_OPERATOR_TUNE_SUGGESTION or OperatorChecker.MSLite_OPERATOR_TUNE_SUGGESTION + ) in self._SUGGESTION) + + def format_operator_result(self, record, limit): + """ + Format operator result to html + :param record: profiling check record + :param limit: Limit number of operator statistics lists. + :return: + """ + optimization_item = record.optimization_item + release_suggestion_list = [] + for suggestion in optimization_item.suggestion: + release_suggestion = copy.deepcopy(suggestion) + if release_suggestion == OperatorChecker.PyTorch_OPERATOR_TUNE_SUGGESTION: + release_suggestion += \ + (f"for details please refer to link : LINK") + elif release_suggestion == OperatorChecker.MSLite_OPERATOR_TUNE_SUGGESTION: + release_suggestion += \ + (f"\nThe config file for MSLite AOE usage is as follows:\n" \ + f"[ascend_context]\n" \ + f"aoe_mode=\"operator tuning\"\n" \ + f"--tune_ops_file={Config().tune_ops_file}\n" + f"\nFor details please refer to link : LINK") + release_suggestion_list.append(release_suggestion.replace('\n', '
')) + format_result = {"record": record.__dict__, + "suggestion": fill('
'.join(release_suggestion_list), width=200), + "task_duration": round(record.statistics_item.task_duration, 2)} + statistic = self.group_by(copy.deepcopy(self._op_list), limit=limit) + format_result["statistic"] = statistic + return format_result + + def group_by(self, op_list, op_key="op_type", + limit: int = constant.OPERATOR_LIST_UNLIMIT): + """ + group by Profiling.OpInfo's attribute key, then return top limit tuple by duration + :param op_list: input a OpInfo list + :param op_key: group by Profiling.OpInfo's attribute key + :param limit: top limit num, if you do not need to limit the length of tuple, input -1(int) + :return: + """ + if op_list is None: + op_list = [] + statistic = {} # str, json + for op_info in op_list: + if statistic.get(op_info.get_attr(op_key)): + statistic[op_info.get_attr(op_key)]["summary"]["total_duration"] = float( + statistic[op_info.get_attr(op_key)]["summary"]["total_duration"]) + float( + op_info.get_attr("task_duration", constant.DEFAULT_DURATION_ZERO)) + statistic[op_info.get_attr(op_key)]["summary"]["counts"] += 1 + stack_info = op_info.get_attr("stack_info") + if stack_info: + op_info.stack_info = stack_info.replace('\r\n', '
') + statistic[op_info.get_attr(op_key)]["op_info_list"].append(op_info) + else: + statistic[op_info.get_attr(op_key)] = {"summary": {}, "op_info_list": []} + statistic[op_info.get_attr(op_key)]["summary"]["op_type"] = op_info.get_attr( + "op_type", constant.DEFAULT_OPERATOR_TYPE) + statistic[op_info.get_attr(op_key)]["summary"]["total_duration"] = float( + op_info.get_attr("task_duration", constant.DEFAULT_DURATION_ZERO)) + statistic[op_info.get_attr(op_key)]["summary"]["counts"] = 1 + stack_info = op_info.get_attr("stack_info") + if stack_info: + op_info.stack_info = stack_info.replace('\r\n', '
') + statistic[op_info.get_attr(op_key)]["op_info_list"] = [op_info] + + if statistic: + for op_key in statistic.keys(): + statistic[op_key]["summary"]["total_duration"] = round( + statistic[op_key]["summary"]["total_duration"], 2) + # Grouped by op_type, sorted by total_duration, and obtained the top 10 operators that take the most time. + if limit > 0: + statistic = sorted( + statistic.items(), key=lambda kv: kv[1]["summary"]["total_duration"], reverse=True)[:limit] + else: + statistic = sorted(statistic.items(), key=lambda kv: kv[1]["summary"]["total_duration"], reverse=True) + else: + logger.warning("%s checker do not has results to format html", str(self.__class__.__name__)) + return statistic + + def _check_data(self, profiling_data): + return True + + def _check_operator(self, op_info): + return False + + def _get_income(self, _op_info: OpInfo) -> float: + return 0 + + def get_tune_op_list(self): + """ + get tune op list + :return: tune op list + """ + return self._tune_op_list + + def get_views(self, _graph_data): + """Get node views.""" + return [] + + @classmethod + def get_name(cls): + """ + get name of checker + :return: checker name + """ + return cls._PROBLEM + + def get_incomes(self) -> float: + """get incomes""" + incomes = 0.0 + for op_info in self._op_list: + income = self._get_income(op_info) + setattr(op_info, "income", round(income, 2)) + incomes += income + return incomes + + def get_op_type_list(self, op_list: List[OpInfo]): + """get op type list""" + op_type_list = [] + for op_info in op_list: + if op_info.op_type not in op_type_list: + op_type_list.append(op_info.op_type) + return op_type_list + + def _check_summary(self, data: ProfilingDataset): + if not hasattr(data, "op_summary"): + logger.warning(self.SKIP_CHECK_MSG, self._CHECKER, "op summary") + return False + return True + + @staticmethod + def get_ratio(op_info: OpInfo, attr: str) -> float: + if not op_info.has_attr(attr): + return 0 + value = op_info.get_attr(attr) + if not value or value == "N/A": + return 0 + return float(value) + + def get_details(self) -> list: + """ + get details of operator to be optimized + :return: detail list + """ + op_list = self._op_list + if not op_list or not (self._ITEMS + [self.STACK_INFO_ITEMS]): + return [] + details = [] + attrs = [attr for attr in (self._ITEMS + [self.STACK_INFO_ITEMS]) if op_list[0].has_attr(attr)] + details.append(attrs) + op_list = sorted(op_list, key=lambda x: float(x.get_attr("task_duration")), reverse=True) + for op_info in op_list: + content = [ + op_info.get_attr(attr) if attr != "aicore_time" + else op_info.get_float_attr(attr, strict_mode=True) + + op_info.get_float_attr("aiv_time", strict_mode=True) for attr in attrs + ] + details.append(content) + return details + + def format_suggestion_content(self, profiling_data: ProfilingDataset) -> None: + return diff --git a/profiler/advisor/analyzer/computation/profiling_analyzer.py b/profiler/advisor/analyzer/computation/profiling_analyzer.py new file mode 100644 index 000000000..98d3c5c49 --- /dev/null +++ b/profiler/advisor/analyzer/computation/profiling_analyzer.py @@ -0,0 +1,71 @@ +import logging +from abc import ABC +from typing import Dict, List + +from profiler.advisor.analyzer.base_analyzer import BaseAnalyzer +from profiler.advisor.common import constant +from profiler.advisor.result.result import OptimizeResult +from profiler.advisor.analyzer.computation.aicpu.aicpu_checker import AicpuChecker +from profiler.advisor.analyzer.computation.bound.block_dim_checker import BlockDimChecker +from profiler.advisor.analyzer.computation.bound.operator_bound_checker import OperatorBoundChecker +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker +from profiler.advisor.analyzer.computation.op_compile.dynamic_shape_checker import DynamicShapeChecker +from profiler.advisor.analyzer.computation.operator_checker import OperatorChecker +from profiler.advisor.display.html.render import HTMLRender +from profiler.advisor.dataset.profiling.profiling_dataset import ProfilingDataset +from profiler.advisor.utils.utils import get_supported_subclass + +logger = logging.getLogger() + + +class ProfilingAnalyzer(BaseAnalyzer, ABC): + dataset_cls_list = [ProfilingDataset] + + def __init__(self, collection_path, **kwargs) -> None: + cann_version = kwargs.get("cann_version", constant.DEFAULT_CANN_VERSION) + torch_version = kwargs.get("torch_version", constant.DEFAULT_TORCH_VERSION) + super().__init__(collection_path, cann_version=cann_version, torch_version=torch_version, **kwargs) + self.checker_list = [checker(cann_version) for checker in get_supported_subclass(OperatorChecker, cann_version)] + # 动态 shape checker 放到首位,因为动态 shape 情形下AOE算子调优现在不支持,AOE 算子调优 checker 可以跳过 + index = next((i for i, item in enumerate(self.checker_list) if isinstance(item, DynamicShapeChecker)), None) + self.checker_list.insert(0, self.checker_list.pop(index)) + self.html_render = HTMLRender() + self.result = OptimizeResult() + + @BaseAnalyzer.check_data((ProfilingDataset.get_key(),)) + def optimize(self) -> OptimizeResult: + """ + optimize operator + :param data: input datasets + :return: result + """ + profiling_data = self.get_first_data_by_key(self.dataset_list, ProfilingDataset.get_key()) + for checker in self.checker_list: + if not checker.pre_check(profiling_data): + continue + if checker.check(profiling_data): + # add record + record = checker.make_record(profiling_data) + checker.make_render(self.html_render, record) + self.result.add(record) + # add details + details = checker.get_details() + if details: + for i, detail in enumerate(details): + if i == 0: + # the first row is header + self.result.add_detail(checker.get_name(), headers=detail) + else: + self.result.add_detail(checker.get_name(), detail=detail) + # add tune op list + tune_op_list = checker.get_tune_op_list() + if tune_op_list: + self.result.add_tune_op_list(tune_op_list) + + return self.result + + def make_record(self): + pass + + def make_render(self): + pass diff --git a/profiler/advisor/analyzer/dataloader/__init__.py b/profiler/advisor/analyzer/dataloader/__init__.py new file mode 100644 index 000000000..e69de29bb -- Gitee