diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index 06be6002e1e075645dd21cd1328505829a9b3305..74ced48ce14953688f10565c980b1216b1f6fafc 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# Copyright (c) 2024, Huawei Technologies Co., Ltd. # All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,10 +18,11 @@ from multiprocessing import Process from analysis.communication_analysis import CommunicationAnalysis from analysis.comm_matrix_analysis import CommMatrixAnalysis from analysis.step_trace_time_analysis import StepTraceTimeAnalysis - +from common_func.context import Context +from common_func.constant import Constant class AnalysisFacade: - analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis, CommMatrixAnalysis} + default_module = {CommunicationAnalysis, StepTraceTimeAnalysis, CommMatrixAnalysis} def __init__(self, params: dict): self.params = params @@ -29,10 +30,19 @@ class AnalysisFacade: def cluster_analyze(self): # 多个profiler用多进程处理 process_list = [] - for analysis in self.analysis_module: + for analysis in self.default_module: process = Process(target=analysis(self.params).run) process.start() process_list.append(process) for process in process_list: process.join() + + def recipe_analyze(self): + print("[INFO] Recipe analysis launched.") + try: + with Context.create_context(self.params.get(Constant.PARALLEL_MODE)) as context: + with self.params.get(Constant.RECIPE_CLASS)(self.params) as recipe: + recipe.run(context) + except Exception as e: + print("[ERROR] Recipe analysis launched failed.") \ No newline at end of file diff --git a/profiler/cluster_analyse/analysis/base_analysis.py b/profiler/cluster_analyse/analysis/base_analysis.py index cc803813dda4a535c529a935c1b42dae197855c9..ce4216639fa4bb56a2bbc2f6b2031229cc12c59d 100644 --- a/profiler/cluster_analyse/analysis/base_analysis.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -1,7 +1,23 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from abc import abstractmethod from common_func.constant import Constant from utils.data_transfer_adapter import DataTransferAdapter from common_func.file_manager import FileManager +import os class BaseAnalysis: @@ -75,3 +91,38 @@ class BaseAnalysis: for rank_tup, group_dict in self.comm_ops_struct.items(): for step_id, communication_ops in group_dict.items(): self.compute_total_info(communication_ops) + + +class BaseRecipeAnalysis: + def __init__(self, params): + self._params = params + self._collection_dir = params.get(Constant.COLLECTION_PATH, "") + self._data_map = params.get(Constant.DATA_MAP, {}) + self._recipe_name = params.get(Constant.RECIPE_NAME, "") + self._mode = params.get(Constant.PARALLEL_MODE, "") + self._analysis_dict = {} + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._params is not None and exc_type is not None: + print(f"[ERROR] Failed to exit analysis: {exc_val}") + def run(self, context): + self._analysis_dict = { + "Mode": self.get_mode(), + "RecipeName": self.get_recipe_name() + } + + def _get_rank_db(self): + db_paths = [os.path.join(rank_path, + Constant.CLUSTER_ANALYSIS_OUTPUT, + f"ascend_pytorch_profiler_{rank_id}.db") + for rank_id, rank_path in self._data_map.items()] + return db_paths + + def get_mode(self): + return self._mode + + def get_recipe_name(self): + return self._recipe_name \ No newline at end of file diff --git a/profiler/cluster_analyse/analysis/cann_api_sum.py b/profiler/cluster_analyse/analysis/cann_api_sum.py new file mode 100644 index 0000000000000000000000000000000000000000..0265fb41d115a9c3b48dc24a7597cf89f70e74ba --- /dev/null +++ b/profiler/cluster_analyse/analysis/cann_api_sum.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from analysis.base_analysis import BaseRecipeAnalysis +from common_func.constant import Constant +from cluster_statistics_export.cann_api_sum_export import CannApiSumExport +class CannApiSum(BaseRecipeAnalysis): + def __init__(self, params): + super().__init__(params) + print("[INFO] CannApiSum init.") + + @staticmethod + def _mapper_func(db_path, params): + df = CannApiSumExport(db_path, params.get(Constant.RECIPE_NAME)).read_export_db() + + if df is None or df.empty: + print("[WARNING] There is no stats data.") + return None + + return df + + def mapper_func(self, context): + return context.map( + self._mapper_func, + self._get_rank_db(), + params=self._params + ) + + def reducer_func(self, mapper_res): + pass + + def run(self, context): + super().run(context) + + mapper_res = self.mapper_func(context) + self.reducer_func(mapper_res) + + self.save_notebook() + self.save_analysis_file() + + + def save_notebook(self): + pass + + def save_analysis_file(self): + pass \ No newline at end of file diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index 24454622119acbb223c70dfea65d3b792b00444c..beb582b95361baa587047a1011a1b36441cc9161 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -22,9 +22,29 @@ from communication_group.communication_group_generator import CommunicationGroup from common_func.constant import Constant from common_func.file_manager import FileManager from common_func.path_manager import PathManager +from common_func import analysis_loader from analysis.analysis_facade import AnalysisFacade +def get_analysis_args(analysis_class, analysis_args): + parser = argparse.ArgumentParser(description="custom analysis args") + parser.add_argument("--parallel_mode", type=str, help="context mode", default="concurrent") + return parser.parse_args(analysis_args) + +def parse_recipe_params(analysis_name, analysis_args): + analysis_class = analysis_loader.get_class_from_name(analysis_name) + if not analysis_class: + print("[ERROR] undefined analysis.") + return None + + args_parsed = get_analysis_args(analysis_class, analysis_args) + recipe_params = { + Constant.RECIPE_NAME: analysis_class[0], + Constant.RECIPE_CLASS: analysis_class[1], + Constant.PARALLEL_MODE: args_parsed.parallel_mode + } + return recipe_params + class Interface: ASCEND_PT = "ascend_pt" ASCEND_MS = "ascend_ms" @@ -37,6 +57,9 @@ class Interface: self.collective_group_dict = {} self.communication_ops = [] self.matrix_ops = [] + self.recipe_name = params.get(Constant.RECIPE_NAME) + self.recipe_class = params.get(Constant.RECIPE_CLASS) + self.recipe_parallel_mode = params.get(Constant.PARALLEL_MODE) def allocate_prof_data(self): ascend_pt_dirs = [] @@ -67,25 +90,37 @@ class Interface: if data_type == Constant.INVALID: print("[ERROR] The current folder contains both DB and other files. Please check.") return - params = { - Constant.COLLECTION_PATH: self.collection_path, - Constant.DATA_MAP: data_map, - Constant.ANALYSIS_MODE: self.analysis_mode, - Constant.DATA_TYPE: data_type - } - comm_data_dict = CommunicationGroupGenerator(params).generate() - params[Constant.COMM_DATA_DICT] = comm_data_dict - AnalysisFacade(params).cluster_analyze() + if self.analysis_mode == "recipe": + params = { + Constant.COLLECTION_PATH: self.collection_path, + Constant.DATA_MAP: data_map, + Constant.RECIPE_NAME: self.recipe_name, + Constant.RECIPE_CLASS: self.recipe_class, + Constant.PARALLEL_MODE: self.recipe_parallel_mode + } + AnalysisFacade(params).recipe_analyze() + else: + params = { + Constant.COLLECTION_PATH: self.collection_path, + Constant.DATA_MAP: data_map, + Constant.ANALYSIS_MODE: self.analysis_mode, + Constant.DATA_TYPE: data_type + } + comm_data_dict = CommunicationGroupGenerator(params).generate() + params[Constant.COMM_DATA_DICT] = comm_data_dict + AnalysisFacade(params).cluster_analyze() if __name__ == "__main__": parser = argparse.ArgumentParser(description="cluster analysis module") parser.add_argument('-d', '--collection_path', type=str, required=True, help="profiling data path") - parser.add_argument('-m', '--mode', choices=['all', 'communication_time', 'communication_matrix'], + parser.add_argument('-m', '--mode', choices=Constant.ALL_FEATURE_LIST, default='all', help="different analysis mode") - args_parsed = parser.parse_args() + args_parsed, args_remained = parser.parse_known_args() parameter = { Constant.COLLECTION_PATH: args_parsed.collection_path, Constant.ANALYSIS_MODE: args_parsed.mode } + if args_parsed.mode not in Constant.COMM_FEATURE_LIST: + parameter.update(parse_recipe_params(args_parsed.mode, args_remained)) Interface(parameter).run() diff --git a/profiler/cluster_analyse/cluster_statistics_export/__init__.py b/profiler/cluster_analyse/cluster_statistics_export/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7101187a2c2619f3b1c20dded14b433950b4c662 --- /dev/null +++ b/profiler/cluster_analyse/cluster_statistics_export/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/cluster_analyse/cluster_statistics_export/cann_api_sum_export.py b/profiler/cluster_analyse/cluster_statistics_export/cann_api_sum_export.py new file mode 100644 index 0000000000000000000000000000000000000000..9b10479789766754263e14a879136c5480478f74 --- /dev/null +++ b/profiler/cluster_analyse/cluster_statistics_export/cann_api_sum_export.py @@ -0,0 +1,64 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cluster_statistics_export.stats_export import StatsExport + +QUERY = """ +WITH + summary as ( + SELECT + name, + sum(endNs - startNs) AS duration, + count (*) AS num, + avg(endNs - startNs) AS avg_duration, + min(endNs - startNs) AS min_duration, + median(endNs - startNs) AS median_duration, + max(endNs - startNs) AS max_duration, + stdev(endNs - startNs) AS stddev, + lower_quartile(endNs - startNs) AS q1, + upper_quartile(endNs - startNs) AS q3 + FROM + CANN_API + GROUP BY name + ), + totals AS ( + SELECT sum(duration) AS total + FROM summary + ) +SELECT + round(summary.duration * 100.0 / totals.total, 2) AS "duration_ratio: %", + summary.duration AS "Total Time: ns", + summary.num AS "Total Count", + round(summary.avg_duration, 1) AS "Average: ns", + summary.min_duration, 1 AS "Min: ns", + round(summary.median_duration, 1) AS "Med: ns", + summary.max_duration, 1 AS "Max: ns", + round(summary.stddev, 1) AS "StdDev: ns" + summary.q1 AS "Q1" + summary.q3 AS "Q3" +FROM + summary +LEFT JOIN + STRING_IDS AS ids + ON ids.id == summary.name +ORDER BY 2 DESC + """ +class CannApiSumExport(StatsExport): + + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name) + self._query = QUERY + print("[INFO] CannApiSumExport init.") \ No newline at end of file diff --git a/profiler/cluster_analyse/cluster_statistics_export/stats_export.py b/profiler/cluster_analyse/cluster_statistics_export/stats_export.py new file mode 100644 index 0000000000000000000000000000000000000000..4e0a98beb88eda88729098a5269627cfc6748716 --- /dev/null +++ b/profiler/cluster_analyse/cluster_statistics_export/stats_export.py @@ -0,0 +1,38 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd + +from common_func.db_manager import DBManager + +class StatsExport: + + def __init__(self, db_path, recipe_name): + self._db_path = db_path + self._recipe_name = recipe_name + self._query = None + + def get_query(self): + return self._query + + def read_export_db(self): + query = self.get_query() + if query is None: + print(f"[ERROR] query is None.") + return + conn, cursor = DBManager.create_connect_db(self._db_path) + data = pd.read_sql(query, conn) + DBManager.destroy_db_connect(conn, cursor) + return data \ No newline at end of file diff --git a/profiler/cluster_analyse/common_func/analysis_loader.py b/profiler/cluster_analyse/common_func/analysis_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..f7761e5892b9559eb8f9b319d9e3b05749c7fdd7 --- /dev/null +++ b/profiler/cluster_analyse/common_func/analysis_loader.py @@ -0,0 +1,38 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import inspect +import sys + +from common_func.constant import Constant +from analysis.base_analysis import BaseRecipeAnalysis + +def is_analysis_class(obj): + return inspect.isclass(obj) and issubclass(obj, BaseRecipeAnalysis) and obj != BaseRecipeAnalysis + +def get_class_from_name(analysis_name : str): + sys.path.append(Constant.ANALYSIS_PATH) + analysis_path = f"analysis.{analysis_name}" + module = None + try: + module = importlib.import_module(analysis_path) + except Exception as e: + print(f"[ERROR] {analysis_path} not find:{e}") + + specific_analysis = inspect.getmembers(module, is_analysis_class) + if not specific_analysis: + print(f"[ERROR] {analysis_name} not found.") + return specific_analysis[0] \ No newline at end of file diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 3b4126de792357b6a7a0d4d0d4dbce40067c4651..61922f2a3a8a24c1cfa53554e66111818d536d9e 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os class Constant(object): # dir name @@ -103,3 +104,15 @@ class Constant(object): CONFIG = "config" EXPER_CONFIG = "experimental_config" EXPORT_TYPE = "_export_type" + + # recipe config + RECIPE_NAME = "recipe_name" + RECIPE_CLASS = "recipe_class" + PARALLEL_MODE = "parallel_mode" + CLUSTER_CUSTOM_ANALYSE_PATH = os.path.abspath(os.path.dirname(__file__)) + ANALYSIS_PATH = os.path.join(CLUSTER_CUSTOM_ANALYSE_PATH, 'analysis') + + CONCURRENT_MODE = "concurrent" + + COMM_FEATURE_LIST = ['all', 'communication_time', 'communication_matrix'] + ALL_FEATURE_LIST = ['all', 'communication_time', 'communication_matrix', 'cann_api_sum'] \ No newline at end of file diff --git a/profiler/cluster_analyse/common_func/context.py b/profiler/cluster_analyse/common_func/context.py new file mode 100644 index 0000000000000000000000000000000000000000..3a9f1c2adab10c1c9eada6624b1306cbfe06de8b --- /dev/null +++ b/profiler/cluster_analyse/common_func/context.py @@ -0,0 +1,79 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from functools import partial +from concurrent import futures +from common_func.constant import Constant + + +class Context(object): + """abstract base class""" + + ctx_map = None + + @classmethod + def create_context(cls, mode=Constant.CONCURRENT_MODE): + if cls.ctx_map is None: + keys = [Constant.CONCURRENT_MODE] + values = [ConcurrentContext] + cls.ctx_map = dict(zip(keys, values)) + + if mode not in cls.ctx_map: + raise NotImplementedError("mode must be in {}".format(keys)) + + return cls.ctx_map[mode]() + + def __init__(self): + print("[INFO] context {} initialized.".format(self._mode)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + if exc_type is not None: + print(f"[ERROR] Failed to exit context: {exc_val}") + + def launch(self, func, *args, **kwargs): + raise NotImplementedError + + def map(self, func, *iterables, **kwargs): + raise NotImplementedError + +class ConcurrentContext(Context): + + def __init__(self, executor=None): + self._mode = Constant.CONCURRENT_MODE + super().__init__() + self._custom = executor is None + self._executor = executor or futures.ProcessPoolExecutor(max_workers=os.cpu_count()) + + def __enter__(self): + if self._executor is None: + raise RuntimeError("executor is None") + return self + + def close(self): + if self._custom: + self._executor.shutdown(wait=True) + self._executor = None + + def launch(self, func, *args, **kwargs): + return self._executor.submit(func, *args, **kwargs).result() + + def map(self, func, *iterables, **kwargs): + partial_func = partial(func, **kwargs) + return list(self._executor.map(partial_func, *iterables)) \ No newline at end of file