diff --git a/profiler/.DS_Store b/profiler/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..808e7adfe7fa8dc70ff506999083bdd74310035f Binary files /dev/null and b/profiler/.DS_Store differ diff --git a/profiler/msprof_analyze/.DS_Store b/profiler/msprof_analyze/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d7eff12b5ed239dc982141feeab021026c9e1d85 Binary files /dev/null and b/profiler/msprof_analyze/.DS_Store differ diff --git a/profiler/msprof_analyze/cluster_analyse/.DS_Store b/profiler/msprof_analyze/cluster_analyse/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..e7211e0a48884917077a6796996fd8ea92a664d0 Binary files /dev/null and b/profiler/msprof_analyze/cluster_analyse/.DS_Store differ diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/.DS_Store b/profiler/msprof_analyze/cluster_analyse/recipes/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a94cbf9aed515d70a1a672e0d834abd485048f9e Binary files /dev/null and b/profiler/msprof_analyze/cluster_analyse/recipes/.DS_Store differ diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py index 0d3fea0f401fa4030d9cdca443e37b4e7a1e8c97..e2491893677ece4cd22f260c6e8ef70b92cf0ce8 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py @@ -121,6 +121,16 @@ class BaseRecipeAnalysis(ABC): else: logger.error(f"Unknown dump data type: {type(data)}") + def append_data(self, data, file_name, table_name=None, index=True, custom_db_path=None): + if table_name: + result_db = custom_db_path if custom_db_path else os.path.join(self.output_path, file_name) + conn, cursor = DBManager.create_connect_db(result_db) + if isinstance(data, pd.DataFrame): + data.to_sql(table_name, conn, if_exists='append', index=index) + else: + logger.error(f"Unknown dump data type: {type(data)}") + DBManager.destroy_db_connect(conn, cursor) + def create_notebook(self, filename, notebook_template_dir=None, replace_dict=None): if notebook_template_dir is None: template_path = os.path.dirname(__file__) diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..de0604079e1323b2749bc801a6e8326893c73498 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/moe2commop.py b/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/moe2commop.py new file mode 100644 index 0000000000000000000000000000000000000000..5efbbd8bb4792f1af1fd8434d624aaf2fc2dd58f --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/moe2commop/moe2commop.py @@ -0,0 +1,117 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pandas as pd + +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_exports.moe2commop_export import Moe2CommopExport +from msprof_analyze.prof_exports.moe_max_comm_opId_export import MoeMaxCommOpIdExport +from msprof_analyze.prof_exports.moe_max_string_ids_export import MoeMaxStringIdsExport + +logger = get_logger() + +TABLE_COMMUNICATION_OP = "COMMUNICATION_OP" +TABLE_STRING_IDS = "STRING_IDS" +DEFAULT_RELAY = 0 +DEFAULT_RETRY = 0 +DEFAULT_DATA_TYPE = 0 +DEFAULT_ALG_TYPE = 0 +DEFAULT_COUNT = 0 +DEFAULT_OP_TYPE = 5 +class Moe2Commop(BaseRecipeAnalysis): + def __init__(self, params): + super().__init__(params) + logger.info("Moe2Commop init.") + self.communication_op = None + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + def run(self, context): + self.mapper_func(context) + + def _mapper_func(self, data_map, analysis_class): + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + + # 获取最大StringId + try: + df_max_stringId = MoeMaxStringIdsExport(profiler_db_path, analysis_class).read_export_db() + new_id = int(df_max_stringId['max_id'].iloc[0]) + 1 if not df_max_stringId.empty else 1 + except (KeyError, IndexError, ValueError) as e: + logger.warning(f"Failed to get max string ID: {str(e)}") + new_id = 1 + + # 创建StringId的MOE_group记录 + new_row = pd.DataFrame({ + 'id': [new_id], + 'value': ['MOE_group'] + }) + string_id_append = new_row[['id', 'value']] + self.append_data(data=string_id_append, file_name=profiler_db_path, + table_name=TABLE_STRING_IDS, index=False, custom_db_path=profiler_db_path) + + # 读取主数据 + df = Moe2CommopExport(profiler_db_path, analysis_class).read_export_db() + + if df is None or df.empty: + logger.warning(f"There is no stats data in {profiler_db_path}.") + return None + + # 列检查 + required_columns = ['id', 'startNs', 'endNs', 'connectionId'] + if not all(col in df.columns for col in required_columns): + logger.error(f"Missing required columns in DataFrame: {required_columns}") + return None + + # 获取最大opId + try: + df_max_opId = MoeMaxCommOpIdExport(profiler_db_path, analysis_class).read_export_db() + new_opId = int(df_max_opId['max_opId'].iloc[0]) + 1 if not df_max_opId.empty else 1 + except (KeyError, IndexError, ValueError) as e: + logger.warning(f"Failed to get max communication op ID: {str(e)}") + new_opId = 1 + + # 构建结果DataFrame + num_records = len(df) + df_concat = pd.DataFrame({ + 'opName': df['id'].astype(int), + 'startNs': df['startNs'].astype(int), + 'endNs': df['endNs'].astype(int), + 'connectionId': df['connectionId'].astype(int), + 'groupName': new_id, # 用STRING_IDS表的字符串ID代替字符串 + 'opId': range(new_opId, new_opId + num_records), + 'relay': DEFAULT_RELAY, + 'retry': DEFAULT_RETRY, + 'dataType': DEFAULT_DATA_TYPE, + 'algType': DEFAULT_ALG_TYPE, + 'count': DEFAULT_COUNT, + 'opType': DEFAULT_OP_TYPE + }, dtype=int) + + # 排序和保存 + communication_op = df_concat[ + ['opName', 'startNs', 'endNs', 'connectionId', 'groupName', 'opId', 'relay', 'retry', 'dataType', 'algType', + 'count', 'opType']] + communication_op = communication_op.copy() + communication_op.sort_values('startNs', ascending=True, inplace=True) + + self.append_data(data=communication_op, file_name=profiler_db_path, + table_name=TABLE_COMMUNICATION_OP, index=False, custom_db_path=profiler_db_path) + + return data_map.get(Constant.RANK_ID) \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_exports/moe2commop_export.py b/profiler/msprof_analyze/prof_exports/moe2commop_export.py new file mode 100644 index 0000000000000000000000000000000000000000..c17ee55d9cf944fda6de33d106afa9f9dace483c --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/moe2commop_export.py @@ -0,0 +1,42 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + +QUERY = """ +SELECT + NAME_IDS.id AS "id", + NAME_IDS.value AS "FuncName", + TASK.startNs, + Task.endNs, + Task.connectionId, + round(TASK.endNs - TASK.startNs) AS "Duration" +FROM + COMMUNICATION_SCHEDULE_TASK_INFO +JOIN + TASK + ON COMMUNICATION_SCHEDULE_TASK_INFO.globalTaskId = TASK.globalTaskId +LEFT JOIN + STRING_IDS AS NAME_IDS + on NAME_IDS.id = COMMUNICATION_SCHEDULE_TASK_INFO.name + WHERE NAME_IDS.value like "Moe%" + """ + + +class Moe2CommopExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name, step_range={}) + self._query = QUERY \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_exports/moe_max_comm_opId_export.py b/profiler/msprof_analyze/prof_exports/moe_max_comm_opId_export.py new file mode 100644 index 0000000000000000000000000000000000000000..dda88331e4cc428978c7698ccee49f4be8163c84 --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/moe_max_comm_opId_export.py @@ -0,0 +1,29 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + +QUERY = """ +SELECT + MAX(opId) AS max_opId +FROM + COMMUNICATION_OP + """ + +class MoeMaxCommOpIdExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name, step_range={}) + self._query = QUERY \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_exports/moe_max_string_ids_export.py b/profiler/msprof_analyze/prof_exports/moe_max_string_ids_export.py new file mode 100644 index 0000000000000000000000000000000000000000..f7017cc002f84ade42fd07837c61d60f82e5be07 --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/moe_max_string_ids_export.py @@ -0,0 +1,29 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + +QUERY = """ +SELECT + MAX(id) AS max_id +FROM + STRING_IDS + """ + +class MoeMaxStringIdsExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name, step_range={}) + self._query = QUERY \ No newline at end of file