From b757e4bd051136b47a4f546aecc5ca6a0b5ed75a Mon Sep 17 00:00:00 2001 From: huxianglong Date: Thu, 10 Jul 2025 15:01:31 +0800 Subject: [PATCH] 930_1 --- .../framework_adapter/modellink_adapter.py | 10 +++ .../tinker/model/adapter_utils.py | 4 +- .../tinker/profiler/block_profiler.py | 61 +++++++++---------- .../profile_classes.py} | 51 +++++++++++++++- .../tinker/profiler/profile_space.py | 41 +------------ .../tinker/search/cost_model.py | 35 +++++------ profiler/msprof_analyze/tinker/search/data.py | 2 +- .../tinker/search/gen_modellink_plan.py | 2 +- .../msprof_analyze/tinker/utils/block_args.py | 2 +- .../msprof_analyze/tinker/utils/constant.py | 4 ++ 10 files changed, 113 insertions(+), 99 deletions(-) rename profiler/msprof_analyze/tinker/{utils/profile_args.py => profiler/profile_classes.py} (66%) diff --git a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py index dada9bfff..2446a4aee 100644 --- a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py +++ b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py @@ -72,6 +72,10 @@ class ModelLinkAdapter(ABC): def pre_mem_profile_backward_step(self): pass + @abstractmethod + def deallocate_output_tensor(self, outputs, deallocate_outputs): + pass + class ModelLinkAdapter11(ModelLinkAdapter): @@ -142,6 +146,12 @@ class ModelLinkAdapter11(ModelLinkAdapter): args.model_type = ModelType.encoder_or_decoder return model_provider() + def deallocate_output_tensor(self, outputs, deallocate_outputs): + from megatron.core.pipeline_parallel import schedules + for _, origin_output in outputs.items(): + schedules.deallocate_output_tensor(origin_output, deallocate_outputs) + + class ModelLinkAdapter10(ModelLinkAdapter11): def initialize(self): diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index 669cf7b93..fb7c8cd7c 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -27,10 +27,8 @@ from tinker.model.block_adapters import BlockAdapter, legacy_block_adapters, mco from tinker.utils.config import TINKER_DIR from tinker.utils.logger import logger from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package +from tinker.utils.constant import MODULE_NAME, PYTHON_STANDARD_INDENT -PYTHON_STANDARD_INDENT = ' ' * 4 - -MODULE_NAME = 'genned_block_forward' block_adapter_file_path = os.path.join(TINKER_DIR, f'model/{MODULE_NAME}.py') diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index 64d73722b..6209e2eeb 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -34,9 +34,9 @@ from tinker.framework_adapter.modellink_adapter import get_adapter, ModelLinkAda from tinker.megatron_patch.arguments import get_num_layers from tinker.megatron_patch.microbatches import rebuild_num_microbatches_calculator from tinker.model.block_infos import get_model_block_infos, BlockInfo +from tinker.profiler.profile_classes import ProfileArgs, InitTensorInfo from tinker.utils.logger import init_profile_log, profile_logger from tinker.utils.npu_timer import NPUTimer -from tinker.utils.profile_args import ProfileArgs from tinker.utils.utils import byte_to_mb start_profiling_time = time.time() @@ -76,9 +76,6 @@ class TinkerProfiler: self.bwd_timer = NPUTimer() self._set_vars() - # shapes for input tensors and extra tensors - self.input_shape_dict = {} # type: Dict[str, Dict[str, List[int]]] - self.input_extra_dict = {} # type: Dict[str, Dict[str, List[int]]] # size of inputs and outputs self.input_size_dict = {} self.output_size_dict = {} @@ -115,7 +112,7 @@ class TinkerProfiler: return output_data, new_mem_reserved - mem_reserved, new_mem_allocated - mem_allocated, new_mem_reserved @staticmethod - def extract_input_tensors(tensors, InitTensorInfo): + def extract_input_tensors(tensors): res = {} for key, value in tensors.items(): @@ -199,11 +196,9 @@ class TinkerProfiler: output_grads = [] # add one more dummy op for each output tensor for output_tensor in origin_outputs: - pool_op = torch.nn.Identity() # exactly dummy,原本按tensor维度使用AdaptiveMaxPool2~1d,不确定是否影响 - output_tensor_ = pool_op(output_tensor) - output_tensor_grad = torch.randn(output_tensor_.size(), requires_grad=False, device=self.DEVICE, + output_tensor_grad = torch.randn(output_tensor.size(), requires_grad=False, device=self.DEVICE, dtype=self.grad_type) - origin_grad = torch.autograd.grad(outputs=output_tensor_, grad_outputs=output_tensor_grad, + origin_grad = torch.autograd.grad(outputs=output_tensor, grad_outputs=output_tensor_grad, inputs=output_tensor, allow_unused=False, retain_graph=False) output_grads.append(origin_grad[0]) @@ -264,7 +259,6 @@ class TinkerProfiler: # 模拟loss grad origin_outputs, output_grads = self.get_outputs_and_grads(output_data) - origin_outputs = sum([torch.mean(origin_output) for origin_output in origin_outputs]) # 反向预热 self.adapter.pre_time_profile_backward_step() @@ -323,13 +317,10 @@ class TinkerProfiler: self.adapter.pre_mem_profile_backward_step() torch.cuda.synchronize() - if "embedding" in block_info.name: + backward_input_tensors = get_backward_input_tensors(block_info, input_data) + if backward_input_tensors is None: torch.autograd.backward(outputs, grad_tensors=output_grads, retain_graph=True) else: - backward_input_tensors = [] - for input_name in input_data: - if input_data[input_name] is not None and input_data[input_name].requires_grad: - backward_input_tensors.append(input_data[input_name]) self.adapter.backward_step(backward_input_tensors, outputs, output_grads) mem_reserved_bwd = byte_to_mb(torch.cuda.memory_reserved()) @@ -356,7 +347,6 @@ class TinkerProfiler: # 2. 准备反向所需输出 outputs, output_grads = self.get_outputs_and_grads(output_data) # 3. 将输出置为标量张量,以满足框架要求(Pseudo-deallocate (i.e., set to scalar) the output tensor's '.data' field.) - # todo 考虑封装并移入adapter deallocate_output_tensor outputs = sum([torch.mean(origin_output) for origin_output in outputs]) # 4. 运行反向并测量时间 if index >= self.args.prof_warmup_times: @@ -371,15 +361,21 @@ class TinkerProfiler: return args = self.args profile_logger.info(f"====== PROFILING RESULTS ({self.profile_args.model}{self.profile_args.hint}) ======") - result_title = ["block_name", "forward-compute", "backward-compute", "input_size", "output_size", "weights", + result_title = ["config", "block_name", "forward-compute", "backward_compute", "input_size", "output_size", + "weights", "activations", "fwd_reserved", "bwd_reserved"] - with open(os.path.join(args.prof_path, self.profile_args.file_name), 'w') as f_result: + csv_path = os.path.join(args.prof_path, "profiled_data.csv") + csv_exists = os.path.isfile(csv_path) + with open(csv_path, 'a') as f_result: f_csv = csv.writer(f_result) - f_csv.writerow(result_title) + if not csv_exists: + f_csv.writerow(result_title) for block_info in block_list: # fwd_time, bwd_time, input_size, output_size, weight_size, activations, reserved_fwd, reserved_bwd - datas = [block_info.name] + [f'{float(info):.3f}' for info in self.profiled_results[block_info.name]] - f_csv.writerow(datas) + data = [f'{float(info):.3f}' for info in self.profiled_results[block_info.name]] + row = [self.profile_args.file_name[:-4]] + [block_info.name] + data + f_csv.writerow(row) + f_csv.writerow([]) def update_micro_batch_size(self, new_mbs): # todo 丑陋的刷新mbs方式 @@ -402,7 +398,6 @@ class TinkerProfiler: def update_tensor_map(self, block_infos: List[BlockInfo], mbs): forward_output = None - InitTensorInfo = namedtuple('InitTensorInfo', ['shape', 'requires_grad', 'device', 'dtype', 'element_size']) # 初始化 if self.block_tensor_map is None: first_input = self._get_first_input() @@ -414,8 +409,8 @@ class TinkerProfiler: input_tensors = block_info.get_input_tensors(first_input, forward_output) forward_output = wrapped_block(input_tensors) - extract_input_tensors_info = self.extract_input_tensors(input_tensors, InitTensorInfo) - extract_output_tensors_info = self.extract_input_tensors(forward_output, InitTensorInfo) + extract_input_tensors_info = self.extract_input_tensors(input_tensors) + extract_output_tensors_info = self.extract_input_tensors(forward_output) block_tensor_map[block_info.name] = extract_input_tensors_info, extract_output_tensors_info self.block_tensor_map = block_tensor_map self.last_mbs = mbs @@ -444,14 +439,14 @@ class TinkerProfiler: self.output_size_dict = {} self.profiled_results = {} - def run_profile(self, task, already_oom=False): + def run_profile(self, task_mbs, already_oom=False): args = self.args block_infos = get_model_block_infos(self.adapter) # 重新初始化,避免不同task间的影响;同时相较于原来的unique_name形式,节约了不少显存 self.reset_params() try: - self.update_micro_batch_size(task["mbs"]) + self.update_micro_batch_size(task_mbs) except AssertionError: traceback.print_exc() profile_logger.error( @@ -459,7 +454,7 @@ class TinkerProfiler: return False # 遍历一轮block,自动生成block中的张量信息,供后续 get_inputs使用 - self.update_tensor_map(block_infos, task["mbs"]) + self.update_tensor_map(block_infos, task_mbs) # infer the data size according to block specs self.infer_data_size(block_infos) @@ -539,7 +534,7 @@ def main(): args = adapter.get_args() # get profiling tasks # "task"s are defined by unique {model, size, mbs} pairs - all_prof_tasks = [] + all_tasks_mbs = [] # todo 当前基于传入配置profiling, model_prof_config 可以专注完成搜索范围指定的工作 model_name = args.prof_model_name # todo 待拓展单次拉起torchrun的profiling维度 @@ -553,16 +548,16 @@ def main(): # 规避部分 mbs oom if mbs >= args.prof_mbs_limit: break - all_prof_tasks.append({"model": model_name, "size": model_size, "mbs": mbs}) # 一个prof_task的tp是固定的 + all_tasks_mbs.append(mbs) # 一个prof_task的tp是固定的 oom_record = False # run profiling tasks - for prof_task in all_prof_tasks: - if prof_task["mbs"] >= args.prof_mbs_limit: + for task_mbs in all_tasks_mbs: + if task_mbs >= args.prof_mbs_limit: oom_record = True - run_well = tinker_profiler.run_profile(prof_task, oom_record) + run_well = tinker_profiler.run_profile(task_mbs, oom_record) if not run_well and not oom_record: - profile_logger.info(f"[Tinker-Profiler] OOM when mbs={prof_task['mbs']}") + profile_logger.info(f"[Tinker-Profiler] OOM when mbs={task_mbs}") oom_record = True end_profiling_time = time.time() diff --git a/profiler/msprof_analyze/tinker/utils/profile_args.py b/profiler/msprof_analyze/tinker/profiler/profile_classes.py similarity index 66% rename from profiler/msprof_analyze/tinker/utils/profile_args.py rename to profiler/msprof_analyze/tinker/profiler/profile_classes.py index 2b51a63c3..b0b6f7b11 100644 --- a/profiler/msprof_analyze/tinker/utils/profile_args.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_classes.py @@ -16,7 +16,45 @@ import re from dataclasses import dataclass, fields -from tinker.profiler.profile_space import ScriptArgs +import torch + + +@dataclass(frozen=True) +class ScriptArgs: + tp: int = 1 + sp: int = 0 + ep: int = 1 + + @property + def cmd_text_list(self): + return [str(value) for value in self.__dict__.values()] + + def items(self): + return self.__dict__.items() + + def is_legal(self, max_npu, args): + p_times = 1 + for p in [self.tp, self.ep]: + if p > 1: + p_times *= p + illegal = [ + max_npu % p_times, + self.tp == 1 and self.sp, + args.hidden_size % self.tp, + args.ffn_hidden_size % self.tp, + args.num_attention_heads % self.tp, + args.group_query_attention and args.num_query_groups % self.tp, + args.seq_length % self.tp and self.sp, + self.ep > 1 and not hasattr(args, 'num_experts') + ] + if not hasattr(args, 'num_experts'): + return not any(illegal) + # MoE + illegal.extend([ + self.tp > 1 and not self.sp, + args.num_experts % self.ep, + ]) + return not any(illegal) @dataclass(frozen=True) @@ -76,4 +114,13 @@ class ProfileArgs(ScriptArgs): if k in exclude_info: continue text += f'{pre_split}{k}{post_split}{v}' - return text \ No newline at end of file + return text + + +@dataclass(frozen=True) +class InitTensorInfo: + shape: torch.Size + requires_grad: bool + device: torch.device + dtype: torch.dtype + element_size: int diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index c0dcd5a38..4cc7a435a 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -29,6 +29,7 @@ import datetime from typing import List, Optional, Tuple from tinker.profiler import gen_model_structure +from tinker.profiler.profile_classes import ScriptArgs from tinker.model.adapter_utils import gen_block_adapter from tinker.utils.config import TINKER_DIR from tinker.utils.logger import init_log, logger @@ -69,44 +70,6 @@ class MemoryImpact: return cls._opposite(impact) -@dataclass(frozen=True) -class ScriptArgs: - tp: int = 1 - sp: int = 0 - ep: int = 1 - - @property - def cmd_text_list(self): - return [str(value) for value in self.__dict__.values()] - - def items(self): - return self.__dict__.items() - - def is_legal(self, max_npu, args): - p_times = 1 - for p in [self.tp, self.ep]: - if p > 1: - p_times *= p - illegal = [ - max_npu % p_times, - self.tp == 1 and self.sp, - args.hidden_size % self.tp, - args.ffn_hidden_size % self.tp, - args.num_attention_heads % self.tp, - args.group_query_attention and args.num_query_groups % self.tp, - args.seq_length % self.tp and self.sp, - self.ep > 1 and not hasattr(args, 'num_experts') - ] - if not hasattr(args, 'num_experts'): - return not any(illegal) - # MoE - illegal.extend([ - self.tp > 1 and not self.sp, - args.num_experts % self.ep, - ]) - return not any(illegal) - - @dataclass class Feature: name: str @@ -392,4 +355,4 @@ def run(args: argparse.Namespace): # 输出profile情况 logger.info(f'Profile Space Total Time: {time.time() - start_time:.2f}') logger.info(f'Profile Data Saved at {dir_path}') - return dir_path \ No newline at end of file + return dir_path diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index bfd4f157c..d5303343e 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -27,10 +27,10 @@ from typing import Dict, List, Optional, Tuple, Union sys.path.append("./") +from tinker.profiler.profile_classes import ProfileArgs from tinker.search.arguments import print_args, preprocess_args from tinker.search.data import TaskParam, ParallelStrategy, Metrics from tinker.utils.block_args import BlockArgs, BlockCost, DetailedInfo -from tinker.utils.profile_args import ProfileArgs from tinker.utils.utils import load_infos, print_result from tinker.utils.logger import logger, init_log @@ -68,14 +68,6 @@ class ProfiledData: def add_data(self, data: Tuple[float, ...], features: FeaturesType, block_name: str): self._block_data[features][block_name] = BlockCost(*data) - def add_data_from_csv(self, file_path: str, profile_args: ProfileArgs): - with open(file_path, 'r') as f: - src_data = csv.reader(f) - _ = next(src_data) # 跳过表头 - for row in src_data: - block_name = row[0] - data = tuple(float(data) for data in row[1:]) - self.add_data(data, profile_args, block_name) def get_data_by_args(self, profiled_args: ProfileArgs): return self._block_data[profiled_args] @@ -328,16 +320,21 @@ class TinkerCostModel: def _read_block_time(self, data_path: str): """基于profiler,生成searcher参数范围;或者直接基于每个tp sp mbs [ep],去衍化dp pp zero""" - for file_path in glob.glob(os.path.join(data_path, '*.csv')): - # 获取文件名(不包括后缀) - filename_without_suffix = os.path.splitext(os.path.basename(file_path))[0] - # todo 尝试优化此处`p2p`硬编码 - if "p2p" in filename_without_suffix: - continue - # 通过文件名获取对应并行策略 - profile_args = ProfileArgs.new_from_file_name(filename_without_suffix) - self.profiled_data.add_data_from_csv(file_path, profile_args) - self._block_data_ready = True + file_path = os.path.join(data_path, 'profiled_data.csv') + try: + with open(file_path, 'r') as f: + data = csv.reader(f) + next(data, None) + for row in data: + if all(not field.strip() for field in row): + continue + filename_without_suffix = row[0] + block_name = row[1] + data = tuple(float(data) for data in row[2:]) + profile_args = ProfileArgs.new_from_file_name(filename_without_suffix) + self.profiled_data.add_data(data, profile_args, block_name) + except Exception as e: + raise RuntimeError(f'Load profiled data: {file_path} failed.') from e def _read_band_time(self, data_path): # todo 当前p2p.csv的表头在读取数据时无用,且囿于2的幂次,考虑优化 diff --git a/profiler/msprof_analyze/tinker/search/data.py b/profiler/msprof_analyze/tinker/search/data.py index 1d8445657..2d6528fa8 100644 --- a/profiler/msprof_analyze/tinker/search/data.py +++ b/profiler/msprof_analyze/tinker/search/data.py @@ -20,7 +20,7 @@ from dataclasses import dataclass from typing import List, Dict from tinker.utils.block_args import BlockArgs -from tinker.utils.profile_args import ProfileArgs +from tinker.profiler.profile_classes import ProfileArgs @dataclass(frozen=True) diff --git a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py b/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py index 1692d5efd..ca710cdd2 100644 --- a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py +++ b/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py @@ -31,7 +31,7 @@ from tinker.utils.logger import logger, init_log from tinker.search.cost_model import TinkerCostModel from tinker.search.arguments import print_args, preprocess_args from tinker.utils.convert_to_trainsh import convert_to_train_script -from tinker.utils.profile_args import ProfileArgs +from tinker.profiler.profile_classes import ProfileArgs MAX_FLOAT = 1.0e9 PRECISION_REDUNDANCY = 1.0e-7 diff --git a/profiler/msprof_analyze/tinker/utils/block_args.py b/profiler/msprof_analyze/tinker/utils/block_args.py index 243326f67..8764da07f 100644 --- a/profiler/msprof_analyze/tinker/utils/block_args.py +++ b/profiler/msprof_analyze/tinker/utils/block_args.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from typing import List, Dict from tinker.utils.logger import logger -from tinker.utils.profile_args import ProfileArgs +from tinker.profiler.profile_classes import ProfileArgs @dataclass diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py index 77f181759..81a8a4d4d 100644 --- a/profiler/msprof_analyze/tinker/utils/constant.py +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -37,6 +37,10 @@ VERSION_ALIASES: Dict[str, Version] = { "1.0.RC3": Version.MindSpeed_LLM_1_0_rc3, } +PYTHON_STANDARD_INDENT = ' ' * 4 + +MODULE_NAME = 'genned_block_forward' + def version_parse(version_str: str) -> Version: normalized_str = version_str.strip().upper() -- Gitee