diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/__init__.py b/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7101187a2c2619f3b1c20dded14b433950b4c662 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2024, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/pp_modeling.py b/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/pp_modeling.py new file mode 100644 index 0000000000000000000000000000000000000000..4302c0cf524e1845905bafafb2be036a03150160 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/recipes/pp_modeling/pp_modeling.py @@ -0,0 +1,696 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from collections import defaultdict + +import pandas as pd +import numpy as np +from tqdm import tqdm + + +from msprof_analyze.cluster_analyse.recipes.base_recipe_analysis import BaseRecipeAnalysis +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger +from msprof_analyze.prof_exports.pp_modeling_export import PPModelingExport +from msprof_analyze.prof_exports.mstx_step_export import MstxStepExport + +logger = get_logger() + +class SlowLinkTest(BaseRecipeAnalysis): + + POLICY = "pp_policy" + LEVELS = "levels" + PP = "pp" + NUM_OF_MICROBATCH = "num_of_micro_batch" + STEP = "step" + VPP = "VPP" + + DEFAULT_POLICY = "1F1B" + DEFAULT_LEVELS = 50 + DEFAULT_PP = 5 + DEFAULT_NUM_OF_MICROBATCH = 5 + DEFAULT_STEP = 8 + DEFAULT_VPP = 2 + + + def __init__(self, Constant): + super().__init__(Constant) + self.TABLE_SLOW_LINK = "slowlinktest" + logger.info("SlowLinkTest init.") + + policy = self._extra_args.get(self.POLICY, self.DEFAULT_POLICY) + levels = self._extra_args.get(self.LEVELS, self.DEFAULT_LEVELS) + pp = self._extra_args.get(self.PP, self.DEFAULT_PP) + num_of_micro_batch = self._extra_args.get(self.NUM_OF_MICROBATCH, self.DEFAULT_NUM_OF_MICROBATCH) + step = self._extra_args.get(self.STEP, self.DEFAULT_STEP) + vpp = self._extra_args.get(self.VPP, self.DEFAULT_VPP) + + self.policy = policy if policy in ["1F1B", "VPP"] else "1F1B" + self.levels = int(levels) if isinstance(levels, str) and levels.isdigit() else self.DEFAULT_LEVELS + self.pp = int(pp) if isinstance(pp, str) and pp.isdigit() else self.DEFAULT_PP + self.num_of_micro_batch = int(num_of_micro_batch) if isinstance(num_of_micro_batch, str) and num_of_micro_batch.isdigit() else self.DEFAULT_NUM_OF_MICROBATCH + self.step = int(step) if isinstance(step, str) and step.isdigit() else self.DEFAULT_STEP + self.vpp = int(vpp) if isinstance(vpp, str) and vpp.isdigit() else self.DEFAULT_VPP + self.num_of_model_trunk = levels // pp // vpp + + @property + def base_dir(self): + return os.path.basename(os.path.dirname(__file__)) + + @classmethod + def add_parser_argument(cls, parser): + parser.add_argument("--pp_policy", type=str, choices=["1F1B", "VPP"], help="Pipeline policy", default=cls.DEFAULT_POLICY) + parser.add_argument("--levels", type=int, help="Model levels number.", default=cls.DEFAULT_LEVELS) + parser.add_argument("--pp", type=int, help="Pipeline model parallel size.", default=cls.DEFAULT_PP) + parser.add_argument("--num_of_micro_batch", type=int, help="Number of micro batch.", default=cls.DEFAULT_NUM_OF_MICROBATCH) + parser.add_argument("--step", type=int, help="Interval of rank in a pp stage.", default=cls.DEFAULT_STEP) + parser.add_argument("--vpp", type=int, help="Virtual pipeline stage.", default=cls.DEFAULT_VPP) + + def pp_model(self, mapper_res): + # 过滤掉mapper_res中为None的元素 + mapper_res = list(filter(lambda df: df is not None, mapper_res)) + + # 如果过滤后mapper_res为空,记录错误并返回 + if not mapper_res: + logger.error("Mapper data is empty. Please check the input or data source.") + return + dataframes = [pd.DataFrame(item) for item in mapper_res] + + all_df = pd.concat(result_df, ignore_index=True) + profiler_pp_start = [] + # 遍历所有profiler step,分别统计各个profiler下send/recv的开始时间(和profiler step的开始时间并不一致) + for profiler in all_df[Constant.PROFILER_STEP].unique(): + profiler_pp_start.append(min(all_df[all_df[Constant.PROFILER_STEP] == profiler][Constant.TS])) + result_df = [] + for df in dataframes: + #print(df["rankId"].iloc[0]) + if self.policy == "1F1B": + result_df.append(self.modeling(df, profiler_pp_start, df["rankId"].iloc[0])) + elif self.policy == "VPP": + result_df.append(self.vpp_modeling(df, profiler_pp_start, df["rankId"].iloc[0])) + self.result_res = pd.concat(result_df, ignore_index=True) + + + def run(self, context): + + mapper_res = self.mapper_func(context) + self.pp_model(mapper_res) + + if self._export_type == "db": + self.save_db() + elif self._export_type == "notebook": + self.save_notebook() + else: + logger.error("Unknown export type.") + + def save_notebook(self): + self.dump_data(self.result_res, "pipieline_modeling.csv", index=False) + self.create_notebook("stats.ipynb") + self.add_helper_file("cluster_display.py") + + def save_db(self): + self.dump_data(self.result_res, Constant.DB_CLUSTER_COMMUNICATION_ANALYZER, self.TABLE_SLOW_LINK, + index=False) + + + def _mapper_func(self, data_map, analysis_class): + profiler_db_path = data_map.get(Constant.PROFILER_DB_PATH) + print(profiler_db_path) + rank_id = data_map.get(Constant.RANK_ID) + df = PPModelingExport(profiler_db_path, analysis_class).read_export_db() + step_df = MstxStepExport(profiler_db_path, analysis_class).read_export_db() + print(step_df) + def find_step(ts): + for index, row in step_df.iterrows(): + if row['start_ns'] <= ts and ts <= row['end_ns']: + return row['step_id'] + return None + + df["profiler step"] = df["ts"].apply(find_step) + + # def find_profiler_start(profiler_step): + # for index, row in step_df.iterrows(): + # if row['step_id'] == profiler_step: + # return row["start_ns"] + # return None + # df["profiler start"] = df["profiler step"].apply(find_profiler_start) + + # def find_profiler_end(profiler_step): + # for index, row in step_df.iterrows(): + # if row['step_id'] == profiler_step: + # return row["end_ns"] + # return None + # df["profiler end"] = df["profiler step"].apply(find_profiler_end) + + if df is None or df.empty: + logger.warning(f"There is no stats data in {profiler_db_path}.") + return None + df.insert(0, "rankId", rank_id) + return df + + # pp流水线分析 + def modeling(self, df, start, rank_id): + result_df = None + # 计算当前rank实际处于第几条流水线 + rank_index = rank_id // self.step + # 各个profiler step 分别进行处理 + for profiler, start_ts in zip(df[Constant.PROFILER_STEP].unique(), start): + # 计算前一个算子结束到当前算子开始的时间间隔。 + # 第一个算子计算和当前step所有算子的开始时间的间隔 + # 提取一个profiler step下的send,重置index,根据ts和dur, + send_df = df[df[Constant.OP_TYPE] == Constant.HCOM_SEND] + send_df = send_df[send_df[Constant.PROFILER_STEP] == profiler] + send_df = send_df.reset_index(drop=True) + send_df.insert(send_df.columns.get_loc(Constant.DUR) + 1, Constant.INTERVAL, send_df[Constant.TS].astype(float)) + first = float(send_df.loc[0, Constant.INTERVAL]) + send_df[Constant.INTERVAL] = send_df[Constant.INTERVAL] - send_df[Constant.INTERVAL].shift(1) - send_df[Constant.DUR].shift(1) + send_df.loc[0, Constant.INTERVAL] = first - start_ts + send_df.insert(Constant.PP_STAGE, [rank_index] * len(send_df)) + + # 提取一个profiler step下的recv + recv_df = df[df[Constant.OP_TYPE] == Constant.HCOM_RECEIVE] + recv_df = recv_df[recv_df[Constant.PROFILER_STEP] == profiler] + recv_df = recv_df.reset_index(drop=True) + recv_df.insert(recv_df.columns.get_loc(Constant.DUR) + 1, Constant.INTERVAL, recv_df[Constant.TS].astype(float)) + first = float(recv_df.loc[0, Constant.INTERVAL]) + recv_df[Constant.INTERVAL] = recv_df[Constant.INTERVAL] - recv_df[Constant.INTERVAL].shift(1) - recv_df[Constant.DUR].shift(1) + recv_df.loc[0, Constant.INTERVAL] = first - start_ts + recv_df.insert(Constant.PP_STAGE, [rank_index] * len(recv_df)) + + # 第一个pp流水线 + if rank_index == 0: + # 计算正常间隔 + forward_time = min(send_df[Constant.INTERVAL].head(self.pp)) + backward_time = min(recv_df[Constant.INTERVAL].tail(self.pp - 1)) + + print("[Info] profiler step {step} expected forward and backward time:{forward}, {backward}".format(step = profiler, forward = forward_time, backward = backward_time)) + # 第一个文件的send全部是正向 + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, [Constant.FORWARD] * len(send_df)) + note = [None] * (self.pp - 1) + [Constant.PP_BUBBLE] + [None] * (len(send_df) - self.pp) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + # 判断各个间隔是否正常 + for index, interval in enumerate(send_df[Constant.INTERVAL]): + # 前pp个 + if index < self.pp: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后面的 + else: + if abs(interval / (forward_time + backward_time) - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入batch_id + batch_id = [i + 1 for i in range(len(send_df))] + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + if result_df is None: + result_df = send_df + else: + result_df = pd.concat([result_df, send_df]) + + # 第一个文件的recv全部是反向 + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, [Constant.BACKWARD] * len(recv_df)) + note = ([Constant.PP_BUBBLE] + + [None] * int(len(recv_df) - self.pp) + + [Constant.PP_GAP] * (self.pp - 1)) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个 + if index == 0: + expected_interval = forward_time * self.pp + + if abs(interval / expected_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后pp - 1个之前: + elif index < int(len(recv_df)) - self.pp + 1: + expected_interval = forward_time + backward_time + if abs(interval / expected_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后面的 pp - 1个: + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + batch_id = [i + 1 for i in range(len(send_df))] + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + result_df = pd.concat([result_df, recv_df]) + + # 最后一个pp流水线 + elif rank_index == self.pp - 1: + expected_common_interval = min(send_df[Constant.INTERVAL]) + expected_first_send_interval = expected_common_interval + float(recv_df.loc[0, Constant.DUR]) + print("[Info] profile step {step} expected common interval: {time}".format(step = profiler, time = expected_common_interval)) + print("[Info] profile step {step} expected first send interval: {time}".format(step = profiler, time = expected_first_send_interval)) + # 最后一条流水线的send全部是反向 + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, [Constant.BACKWARD] * len(send_df)) + note = [None] * len(send_df) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(send_df[Constant.INTERVAL]): + if index == 0: + if abs(interval / expected_first_send_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + else: + if abs(interval / expected_common_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入batch_id + batch_id = [i + 1 for i in range(len(send_df))] + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + result_df = pd.concat([result_df, send_df]) + # 最后一条流水线的recv全部是正向 + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, [Constant.FORWARD] * len(recv_df)) + note = [Constant.PP_BUBBLE] + [None] * int(len(send_df) - 1) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个recv间隔一般为0,不用统计。即便dur时间有问题也意味着问题出在前面的流水线中,和本条关系不大。 + if index == 0: + normal_list.append(Constant.NORMAL) + elif index > 0: + if abs(interval / expected_common_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + batch_id = [i + 1 for i in range(len(send_df))] + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + result_df = pd.concat([result_df, recv_df]) + # 中间流水线 + else: + + # 中间流水线的send,前 pp - rank_index 个为正向,后pp - rank_index为反向, 中间的 (batch - (pp - rank_index)) * 2为反正交替 + fw_list = ([Constant.FORWARD] * (self.pp - rank_index) + + [Constant.BACKWARD if i % 2 == 0 else Constant.FORWARD for i in range(len(send_df) - 2 * (self.pp - rank_index))] + + [Constant.BACKWARD] * (self.pp - rank_index)) + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, fw_list) + # 正反向时间 + forward_time = min(send_df[Constant.INTERVAL]) + backward_time = min(send_df[send_df[Constant.FORWARD_BACKWARD] == Constant.BACKWARD][Constant.INTERVAL]) + print("[Info] profiler step {step} expected forward and backward time:{forward}, {backward}".format(step = profiler, forward = forward_time, backward = backward_time)) + note = [None] * (self.pp - rank_index) + [Constant.PP_BUBBLE] + [None] * (len(send_df) - 2 * (self.pp - rank_index)) + [Constant.PP_BUBBLE] + [None] * (self.pp - rank_index) + note = ([None] * (self.pp - rank_index - 1) + + [Constant.PP_BUBBLE] + + [None] * (len(send_df) - self.pp + rank_index)) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + # 第一个元素的预期间隔 + expected_first_interval = forward_time * (rank_index + 1) + # 最后几个有gap的后向预期间隔 + expected_gap_interval = forward_time + backward_time + # 判断各个间隔时间是否正常 + normal_list = [] + for index, interval in enumerate(send_df[Constant.INTERVAL]): + # 第一个 + if index == 0: + if abs(interval / expected_first_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后几个有gap的反向,数目是最后几个反向数目 - 1(最后几个反向的第一个和前面间隔正常并无gap) + elif index > len(send_df) - (self.pp - rank_index): + if abs(interval / expected_gap_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 中间交替部分 + else: + normal_interval = forward_time if fw_list[index] == Constant.FORWARD else backward_time + if abs(interval / normal_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入batch_id + batch_id = [] + f_id = 1 + b_id = 1 + for direct in fw_list: + if direct == Constant.FORWARD: + batch_id.append(f_id) + f_id += 1 + else: + batch_id.append(b_id) + b_id += 1 + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + result_df = pd.concat([result_df, send_df]) + # 中间流水线的recv,前 pp - rank_index 个为前向,后 pp - rank_index 个为后向, 中间的 (batch - (pp - rank_index)) * 2为前后交替 + # 和send的正反排布其实一致,但分布不同,在时间轴上来看是有错落的 + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, fw_list) + note = ([Constant.PP_GAP] + + [None] * (self.pp - rank_index - 1) + + [Constant.PP_BUBBLE] + + [None]* (len(recv_df) - 2 * (self.pp - rank_index)) + + [Constant.PP_GAP] * (self.pp - rank_index - 1)) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个recv,往往间隔为0,因为从开始就等待接受前面的流水线发送数据,因此不用关注 + if index == 0: + normal_list.append(Constant.NORMAL) + # 第2 到 pp - rank_index个,间隔预期为正向计算时间 + elif index < self.pp - rank_index: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后几个有gap的反向,数目是最后几个反向数目 - 1(最后几个反向的第一个和前面间隔正常并无gap) + elif index > len(recv_df) - (self.pp - rank_index): + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 中间交替部分,和send不同,正向预期间隔时间为反向计算时间,反向预期间隔时间为正向计算时间 + else: + normal_interval = forward_time if fw_list[index] == Constant.BACKWARD else backward_time + if abs(interval / normal_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + result_df = pd.concat([result_df, recv_df]) + return result_df.sort_values(by=Constant.TS, inplace=True) + + + def vpp_modeling(self, df, start, rank_id): + result_df = None + rank_index = rank_id // self.step + + # 各个profiler step 分别进行处理 + for profiler, start_ts in zip(df[Constant.PROFILER_STEP].unique(), start): + # 提取一个profiler step下的send,重置index,根据ts和dur, + # 计算前一个算子结束到当前算子开始的时间间隔。 + # 第一个算子计算和当前step所有算子的开始时间的间隔 + send_df = df[df[Constant.OP_TYPE] == Constant.HCOM_SEND] + send_df = send_df[send_df[Constant.PROFILER_STEP] == profiler] + send_df = send_df.reset_index(drop=True) + send_df.insert(send_df.columns.get_loc(Constant.DUR) + 1, Constant.INTERVAL, send_df[Constant.TS].astype(float)) + first = float(send_df.loc[0, Constant.INTERVAL]) + send_df[Constant.INTERVAL] = send_df[Constant.INTERVAL] - send_df[Constant.INTERVAL].shift(1) - send_df[Constant.DUR].shift(1) + send_df.loc[0, Constant.INTERVAL] = first - start_ts + send_df.insert(send_df.columns.get_loc(Constant.OP_TYPE) + 1, Constant.PP_STAGE, [rank_index] * len(send_df)) + # 提取一个profiler step下的recv + recv_df = df[df[Constant.OP_TYPE] == Constant.HCOM_RECEIVE] + recv_df = recv_df[recv_df[Constant.PROFILER_STEP] == profiler] + recv_df = recv_df.reset_index(drop=True) + recv_df.insert(recv_df.columns.get_loc(Constant.DUR) + 1, Constant.INTERVAL, recv_df[Constant.TS].astype(float)) + first = float(recv_df.loc[0, Constant.INTERVAL]) + recv_df[Constant.INTERVAL] = recv_df[Constant.INTERVAL] - recv_df[Constant.INTERVAL].shift(1) - recv_df[Constant.DUR].shift(1) + recv_df.loc[0, Constant.INTERVAL] = first - start_ts + recv_df.insert(recv_df.columns.get_loc(Constant.OP_TYPE) + 1, Constant.PP_STAGE, [rank_index] * len(recv_df)) + # 第一个pp流水线 + if rank_index == 0: + forward_send_num = self.num_of_model_trunk * self.num_of_micro_batch + backward_send_num = self.num_of_model_trunk * (self.num_of_micro_batch - 1) + # 计算正常间隔 + forward_time = send_df[Constant.INTERVAL].head(forward_send_num).sort_values().iloc[forward_send_num // 2] + backward_time = send_df[Constant.INTERVAL].tail(backward_send_num).sort_values().iloc[backward_send_num // 2] + + print("[Info] profiler step {step} expected forward and backward time:{forward}, {backward}".format(step = profiler, forward = forward_time, backward = backward_time)) + # 第一个文件的前面send全部是正向,后面全是反向 + send_direct = [Constant.FORWARD] * forward_send_num + [Constant.BACKWARD] * backward_send_num + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, send_direct) + note = [None] * forward_send_num + [Constant.PP_BUBBLE] + [None] * (backward_send_num - 1) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + # 判断各个间隔是否正常 + for index, interval in enumerate(send_df[Constant.INTERVAL]): + # 前向部分 + if index < forward_send_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后向第一个,有bubble,不计算 + elif index == forward_send_num: + normal_list.append(Constant.NORMAL) + # 反向部分 + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入micro_batch_id和model_trunk_id + batch_id = ([i % self.num_of_micro_batch for i in range(forward_send_num)] + + [i % self.num_of_micro_batch for i in range(backward_send_num)]) + model_trunk_id = ([i // self.num_of_micro_batch * self.num_of_model_trunk + rank_index for i in range(forward_send_num)] + + [(self.num_of_model_trunk - 1 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_send_num)]) + + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + send_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + if result_df is None: + result_df = send_df + else: + result_df = pd.concat([result_df, send_df]) + + # 第一个文件的recv前面全是正向,后面全是反向 + forward_recv_num = (self.num_of_model_trunk - 1) * self.num_of_micro_batch + backward_recv_num = self.num_of_model_trunk * self.num_of_micro_batch + recv_direct = [Constant.FORWARD] * forward_recv_num + [Constant.BACKWARD] * backward_recv_num + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, recv_direct) + note = ([Constant.PP_BUBBLE] + + [None] * (forward_recv_num - 1) + + [Constant.PP_BUBBLE] + + [None] * (backward_recv_num - 1)) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个, 预计的间隔为前向间隔 * trunk + if index == 0: + expected_interval = forward_time * (self.num_of_model_trunk) + if abs(interval / expected_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 其余的正向和反向第一个 + elif index <= forward_recv_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 反向部分: + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + batch_id = ([i % self.num_of_micro_batch for i in range(forward_recv_num)] + + [i % self.num_of_micro_batch for i in range(backward_recv_num)]) + model_trunk_id = ([(i // self.num_of_micro_batch + 1) * self.num_of_model_trunk + rank_index for i in range(forward_recv_num)] + + [(self.num_of_model_trunk - 1 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_recv_num)]) + + recv_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + recv_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + result_df = pd.concat([result_df, recv_df]) + # 最后一个 + elif rank_index == self.pp - 1: + forward_send_num = (self.num_of_model_trunk - 1) * self.num_of_micro_batch + backward_send_num = self.num_of_model_trunk * self.num_of_micro_batch + # 计算正常间隔 + forward_time = send_df[Constant.INTERVAL].head(forward_send_num).sort_values().iloc[forward_send_num // 2] + backward_time = send_df[Constant.INTERVAL].tail(backward_send_num).sort_values().iloc[backward_send_num // 2] + + print("[Info] profiler step {step} expected forward and backward time:{forward}, {backward}".format(step = profiler, forward = forward_time, backward = backward_time)) + # 前面send全部是正向,后面全是反向 + send_direct = [Constant.FORWARD] * forward_send_num + [Constant.BACKWARD] * backward_send_num + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, send_direct) + note = [None] * forward_send_num + [Constant.PP_BUBBLE] + [None] * (backward_send_num - 1) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + # 判断各个间隔是否正常 + for index, interval in enumerate(send_df[Constant.INTERVAL]): + # 第一个正向 + if index == 0: + expected_interval = forward_time * (rank_index + 1) + if abs(interval / expected_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 其余正向 + elif index < forward_send_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后向第一个,有bubble,不计算 + elif index == forward_send_num: + normal_list.append(Constant.NORMAL) + # 反向部分 + else: + # 小于阈值,可以认为存在空层,所以计算时间显著较短 + if interval / backward_time - 1 < -(Constant.INTERVAL_MULTIPLE_THRESHOLD): + normal_list.append(Constant.EMPTY_LAYER) + # 大于阈值,有慢卡 + elif interval / backward_time - 1 > Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.ABNORMAL) + # 正常情况 + else: + normal_list.append(Constant.NORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入micro_batch_id和model_trunk_id + batch_id = ([i % self.num_of_micro_batch for i in range(forward_send_num)] + + [i % self.num_of_micro_batch for i in range(backward_send_num)]) + model_trunk_id = ([i // self.num_of_micro_batch * self.num_of_model_trunk + rank_index for i in range(forward_send_num)] + + [(self.num_of_model_trunk - 1 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_send_num)]) + + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + send_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + if result_df is None: + result_df = send_df + else: + result_df = pd.concat([result_df, send_df]) + + # recv前面全是正向,后面全是反向 + forward_recv_num = self.num_of_model_trunk * self.num_of_micro_batch + backward_recv_num = (self.num_of_model_trunk - 1) * self.num_of_micro_batch + recv_direct = [Constant.FORWARD] * forward_recv_num + [Constant.BACKWARD] * backward_recv_num + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, recv_direct) + note = ([Constant.PP_BUBBLE] + + [None] * (forward_recv_num - 1) + + [Constant.PP_BUBBLE] + + [None] * (backward_recv_num - 1)) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个, 预计的间隔为0 + if index == 0: + normal_list.append(Constant.NORMAL) + # 其余的正向 + elif index < forward_recv_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 反向第一个有bubble不统计: + elif index == forward_recv_num: + normal_list.append(Constant.NORMAL) + # 其余反向 + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + batch_id = ([i % self.num_of_micro_batch for i in range(forward_recv_num)] + + [i % self.num_of_micro_batch for i in range(backward_recv_num)]) + model_trunk_id = ([i // self.num_of_micro_batch * self.num_of_model_trunk + rank_index for i in range(forward_recv_num)] + + [(self.num_of_model_trunk - 2 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_recv_num)]) + + recv_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + recv_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + result_df = pd.concat([result_df, recv_df]) + # 中间部分 + else: + forward_send_num = self.num_of_model_trunk * self.num_of_micro_batch + backward_send_num = self.num_of_model_trunk * self.num_of_micro_batch + # 计算正常间隔 + forward_time = send_df[Constant.INTERVAL].head(forward_send_num).sort_values().iloc[forward_send_num // 2] + backward_time = send_df[Constant.INTERVAL].tail(backward_send_num).sort_values().iloc[backward_send_num // 2] + print("[Info] profiler step {step} expected forward and backward time:{forward}, {backward}".format(step = profiler, forward = forward_time, backward = backward_time)) + # 第一个文件的前面send全部是正向,后面全是反向 + send_direct = [Constant.FORWARD] * forward_send_num + [Constant.BACKWARD] * backward_send_num + send_df.insert(send_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, send_direct) + note = [None] * forward_send_num + [Constant.PP_BUBBLE] + [None] * (backward_send_num - 1) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + # 判断各个间隔是否正常 + for index, interval in enumerate(send_df[Constant.INTERVAL]): + # 前向部分 + if index == 0: + expected_interval = forward_time * (rank_index + 1) + if abs(interval / expected_interval - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 其余前向 + elif index < forward_send_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 后向第一个,有bubble,不计算 + elif index == forward_send_num: + normal_list.append(Constant.NORMAL) + # 反向部分 + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + send_df.insert(send_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + # 插入micro_batch_id和model_trunk_id + batch_id = ([i % self.num_of_micro_batch for i in range(forward_send_num)] + + [i % self.num_of_micro_batch for i in range(backward_send_num)]) + model_trunk_id = ([i // self.num_of_micro_batch * self.num_of_model_trunk + rank_index for i in range(forward_send_num)] + + [(self.num_of_model_trunk - 1 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_send_num)]) + send_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + send_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + if result_df is None: + result_df = send_df + else: + result_df = pd.concat([result_df, send_df]) + + # recv前面全是正向,后面全是反向 + forward_recv_num = self.num_of_model_trunk * self.num_of_micro_batch + backward_recv_num = self.num_of_model_trunk * self.num_of_micro_batch + recv_direct = [Constant.FORWARD] * forward_recv_num + [Constant.BACKWARD] * backward_recv_num + recv_df.insert(recv_df.columns.get_loc(Constant.PP_STAGE) + 1, Constant.FORWARD_BACKWARD, recv_direct) + note = ([Constant.PP_BUBBLE] + + [None] * (forward_recv_num - 1) + + [Constant.PP_BUBBLE] + + [None] * (backward_recv_num - 1)) + recv_df.insert(recv_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.NOTE, note) + normal_list = [] + for index, interval in enumerate(recv_df[Constant.INTERVAL]): + # 第一个, 开始就在等待,不存在间隔 + if index == 0: + normal_list.append(Constant.NORMAL) + # 其余的正向部分和反向第一个 + elif index <= forward_recv_num: + if abs(interval / forward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + # 反向部分: + else: + if abs(interval / backward_time - 1) < Constant.INTERVAL_MULTIPLE_THRESHOLD: + normal_list.append(Constant.NORMAL) + else: + normal_list.append(Constant.ABNORMAL) + recv_df.insert(recv_df.columns.get_loc(Constant.NOTE) + 1, Constant.NORMAL_INTERVAL, normal_list) + batch_id = ([i % self.num_of_micro_batch for i in range(forward_recv_num)] + + [i % self.num_of_micro_batch for i in range(backward_recv_num)]) + model_trunk_id = ([i // self.num_of_micro_batch * self.num_of_model_trunk + rank_index for i in range(forward_recv_num)] + + [(self.num_of_model_trunk - 1 - (i // self.num_of_micro_batch)) * self.num_of_model_trunk + rank_index for i in range(backward_recv_num)]) + recv_df.insert(send_df.columns.get_loc(Constant.FORWARD_BACKWARD) + 1, Constant.BATCH_ID, batch_id) + recv_df.insert(send_df.columns.get_loc(Constant.BATCH_ID) + 1, Constant.MODEL_TRUNK_ID, model_trunk_id) + result_df = pd.concat([result_df, recv_df]) + return result_df.sort_values(by=Constant.TS, inplace=True) + \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py index 5601a9f77fe0332d8258664a40aa449fb6aafbd7..db7b34536877e91afb8b44d0b4d225a0b67ff43a 100644 --- a/profiler/msprof_analyze/prof_common/constant.py +++ b/profiler/msprof_analyze/prof_common/constant.py @@ -480,3 +480,30 @@ class Constant(object): UINT32_MASK = 0xffffffff INVALID_RANK_NUM = 4294967295 + + SEND = "send" + RECEIVE = "receive" + + GLOBAL_ID = "global id" + TS = "ts" + DUR = "dur" + INTERVAL = "interval" + PP_STAGE = "pp stage" + OP_TYPE = "op type" + PROFILER_STEP = "profiler step" + FORWARD_BACKWARD = "forward or backward" + NORMAL_INTERVAL = "anomaly detection" + NOTE = "note" + BATCH_ID = "batch id" + MODEL_TRUNK_ID = "model trunk id" + + FORWARD = "forward" + BACKWARD = "backward" + + NORMAL = "normal" + ABNORMAL = "abnormal" + EMPTY_LAYER = "empty layers" + PP_GAP = "gap" + PP_BUBBLE = "bubble" + + INTERVAL_MULTIPLE_THRESHOLD = 0.2 \ No newline at end of file diff --git a/profiler/msprof_analyze/prof_exports/pp_modeling_export.py b/profiler/msprof_analyze/prof_exports/pp_modeling_export.py new file mode 100644 index 0000000000000000000000000000000000000000..151c00c97e3aec4da6e9171939ebe9c3e44c921f --- /dev/null +++ b/profiler/msprof_analyze/prof_exports/pp_modeling_export.py @@ -0,0 +1,38 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprof_analyze.prof_exports.base_stats_export import BaseStatsExport + +QUERY = """ + SELECT + si.value AS groupName, + co.startNs AS ts, + co.endNs - co.startNs AS dur, + sii.value AS opName, + op.value AS opType + FROM + COMMUNICATION_OP co + CROSS + JOIN STRING_IDS si ON co.groupName = si.id + JOIN STRING_IDS sii ON co.opName = sii.id + JOIN STRING_IDS op ON co.opType = op.id +""" + + +class PPModelingExport(BaseStatsExport): + + def __init__(self, db_path, recipe_name): + super().__init__(db_path, recipe_name) + self._query = QUERY