diff --git a/profiler/msprof_analyze/test/st/tinker/test_gen_modellink_plan.py b/profiler/msprof_analyze/test/st/tinker/test_gen_modellink_plan.py index 95944c8fdc0a274b05756950875450ba6bfd9d1d..6728a84f57d27d89d9591affe2e602c259da3812 100644 --- a/profiler/msprof_analyze/test/st/tinker/test_gen_modellink_plan.py +++ b/profiler/msprof_analyze/test/st/tinker/test_gen_modellink_plan.py @@ -28,7 +28,7 @@ from profiler.msprof_analyze.tinker.utils.config import parse_args sys.path.append("/") from profiler.msprof_analyze.tinker.utils.utils import project_root, read_file -from profiler.msprof_analyze.tinker.search import gen_modellink_plan +from profiler.msprof_analyze.tinker.search import optimize from profiler.msprof_analyze.tinker.search.arguments import print_args Strategy = namedtuple('Strategy', ['tp', 'pp', 'dp', 'sp', 'zero', 'mbs', 'num_layer_list', 'rc']) @@ -101,7 +101,7 @@ class TestGenModellinkPlan(unittest.TestCase): os.path.join(ST_DATA_PATH, 'standard_profiled_data', 'llama2-7b'), '-output', save_path] patch = mock.patch('profiler.msprof_analyze.tinker.search.arguments.print_args').start() - gen_modellink_plan.run(parse_args()) + optimize.run(parse_args()) log_file_path = patch.call_args[0][0].log_file log_output = read_file(log_file_path) diff --git a/profiler/msprof_analyze/tinker/CHANGELOG.md b/profiler/msprof_analyze/tinker/CHANGELOG.md index 4a069962fcbfa7e93d748358ca5e4f033d37fada..f82b9161ec8a0764bb4b8504c080be3155bd9f69 100644 --- a/profiler/msprof_analyze/tinker/CHANGELOG.md +++ b/profiler/msprof_analyze/tinker/CHANGELOG.md @@ -39,7 +39,7 @@ 1.2.0 -- 调整`zero`切分内容:2阶优化器状态、全精度权重参数 +- 调整`dist_opt`切分内容:2阶优化器状态、全精度权重参数 - 调整`reserved`内存仿真逻辑 - 增加`attention_mask`内存占用仿真 - 调整`recompute`内存仿真,开启时保留`input`内存占用 \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/ReadMe.md b/profiler/msprof_analyze/tinker/ReadMe.md index 6222590222d225fc45da2f20aa7b7682d6c16f92..c92ca35c7cb362536e47a4affe244dbbff80decd 100644 --- a/profiler/msprof_analyze/tinker/ReadMe.md +++ b/profiler/msprof_analyze/tinker/ReadMe.md @@ -80,7 +80,7 @@ python /xxx/tinker/tinker_auto_parallel.py --mode all --config_path /xxx/tinker/ - `--simu_pp`: 指定pp值(可选,默认值1) - `--simu_ep`: 指定ep值(可选,默认值1) - `--simu_sp`: 指定sp值(可选,默认值0) -- `--zero`: 指定ZeRO模式(0关或1开) +- `--dist_opt`: 指定distributed optimizer(ZeRO)模式(0关或1开) - `-mbs`: 指定micro batch size - `--num_layer_list`: 模型分层列表,例如4,4,4,4 - `--recompute`: 是否开启重计算(0关或1开) diff --git a/profiler/msprof_analyze/tinker/parameter_config.json b/profiler/msprof_analyze/tinker/parameter_config.json index 0b20e81e6d039a671108e3e041498b4a2ed3374a..7385002e29f4902b4c2ba98cae11bd1a07ee20e9 100644 --- a/profiler/msprof_analyze/tinker/parameter_config.json +++ b/profiler/msprof_analyze/tinker/parameter_config.json @@ -30,7 +30,7 @@ "simu_pp": 1, "simu_ep": 1, "simu_sp": 0, - "zero": 0, + "dist_opt": 0, "micro_batch_size": 1, "num_layer_list": null, "recompute": 0 diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index 64d73722b5a645292a11e9ced74e3289ce34bb5b..ccd0a289fb0c028a4c49cc0a03466a326b460ed2 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -356,7 +356,7 @@ class TinkerProfiler: # 2. 准备反向所需输出 outputs, output_grads = self.get_outputs_and_grads(output_data) # 3. 将输出置为标量张量,以满足框架要求(Pseudo-deallocate (i.e., set to scalar) the output tensor's '.data' field.) - # todo 考虑封装并移入adapter deallocate_output_tensor + # TODO 考虑封装并移入adapter deallocate_output_tensor outputs = sum([torch.mean(origin_output) for origin_output in outputs]) # 4. 运行反向并测量时间 if index >= self.args.prof_warmup_times: @@ -382,7 +382,7 @@ class TinkerProfiler: f_csv.writerow(datas) def update_micro_batch_size(self, new_mbs): - # todo 丑陋的刷新mbs方式 + # TODO 丑陋的刷新mbs方式 envs_dict = self.profile_args.__dict__ envs_dict['mbs'] = new_mbs self.profile_args = self.profile_args.update_mbs(new_mbs) @@ -540,9 +540,9 @@ def main(): # get profiling tasks # "task"s are defined by unique {model, size, mbs} pairs all_prof_tasks = [] - # todo 当前基于传入配置profiling, model_prof_config 可以专注完成搜索范围指定的工作 + # TODO 当前基于传入配置profiling, model_prof_config 可以专注完成搜索范围指定的工作 model_name = args.prof_model_name - # todo 待拓展单次拉起torchrun的profiling维度 + # TODO 待拓展单次拉起torchrun的profiling维度 model_prof_config = {"mbs": [1, 2, 4, 8]} model_size = args.prof_model_size if args.prof_mbs_list is None: diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index c0dcd5a38e8e6fcf01811e2aa15f0e78cde423b5..7ed68970f4c3c8aa27ab87ac9424ee244d2f5f87 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -71,6 +71,14 @@ class MemoryImpact: @dataclass(frozen=True) class ScriptArgs: + """ + 基础脚本参数 + + 属性: + tp: int - tensor parallel size + sp: int - sequence parallel size + ep: int - expert parallel + """ tp: int = 1 sp: int = 0 ep: int = 1 diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index bfd4f157c8ab7907825f2d0fa53364ce39dd85b4..8488ecd72f06f6e0e8f17efce36b3398d573a873 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -28,10 +28,11 @@ from typing import Dict, List, Optional, Tuple, Union sys.path.append("./") from tinker.search.arguments import print_args, preprocess_args -from tinker.search.data import TaskParam, ParallelStrategy, Metrics +from tinker.search.data import TaskParam, SearchArgs, ResultArgs, Metrics +from tinker.search.process import ResultOutputHandler from tinker.utils.block_args import BlockArgs, BlockCost, DetailedInfo from tinker.utils.profile_args import ProfileArgs -from tinker.utils.utils import load_infos, print_result +from tinker.utils.utils import load_infos, convert_to_pp_stage_block_idx from tinker.utils.logger import logger, init_log FeaturesType = ProfileArgs @@ -106,8 +107,8 @@ class TinkerCostModel: self._band_data_ready = None self._block_data_ready = None self.profiled_data = ProfiledData() - # todo 考虑把读数据的模块单独提一个类,或者相关read逻辑放进 ProfiledData - self._read_block_time(args.profiled_data_path) + # TODO 考虑把读数据的模块单独提一个类,或者相关read逻辑放进 ProfiledData + self._read_block_data(args.profiled_data_path) self.inter_band = None # type: Optional[List[float]] self.intra_band = None # type: Optional[List[float]] self._read_band_time(args.profiled_data_path) @@ -152,7 +153,7 @@ class TinkerCostModel: :param pp: 本次训练流水线并行度,也即总流水线stage数量 :param stage: 当前stage序号,首stage序号为0 :param micro_batch_num: 在流水线上的微批个数,即gbs // dp // mbs - :return: 该stage需爆粗你的峰值前向激活值份数 + :return: 该stage需保存你的峰值前向激活值份数 """ return min(pp - stage, micro_batch_num) @@ -207,7 +208,7 @@ class TinkerCostModel: if data_size < 0: raise ValueError(f'communicate data size invalid: {data_size} <= 0') if data_size == 0: - # todo 这个情况,得区分:是不存在通信,还是有一个空通信。前者取0,后者取传包最小时间 + # TODO 这个情况,得区分:是不存在通信,还是有一个空通信。前者取0,后者取传包最小时间 return 0 bands = self.inter_band if inter_node else self.intra_band index = int(math.log(data_size, 2)) @@ -276,13 +277,14 @@ class TinkerCostModel: mem_costs = [] num_npu_before = 0 profile_args = param.blocks[0].profile_args - micro_batch_num = self.args.global_batch_size // param.cost_model_args['dp'] // profile_args.mbs + micro_batch_num = self.args.global_batch_size // param.search_args.dp // profile_args.mbs # 提前计算各stage因内存碎片而产生的reserved内存峰值 - reserved_mem_costs = TinkerCostModel.calc_reserved_mem_costs(param.pp, param.blocks) + pp = param.search_args.pp + reserved_mem_costs = TinkerCostModel.calc_reserved_mem_costs(pp, param.blocks) # 逐 pp stage 计算时空开销 - for p in range(param.pp): + for p in range(pp): if detail: - logger.debug(f'stage {p}'.center(80, '=')) + logger.info(f'stage {p}'.center(80, '=')) detailed_info = DetailedInfo() detail_infos.append(detailed_info) time_cost, mem_cost = 0, 0 @@ -293,10 +295,10 @@ class TinkerCostModel: # 逐block计算性能 for block_idx in range(head_idx, tail_idx + 1): block = param.blocks[block_idx] - block.num_fwd_act = TinkerCostModel.get_num_fwd_act(param.pp, p, micro_batch_num) + block.num_fwd_act = TinkerCostModel.get_num_fwd_act(pp, p, micro_batch_num) mem_cost += block.block_mem() num_npu_before, time_cost, input_comm, output_comm = self.get_stage_status( - param.blocks[head_idx: tail_idx + 1], num_npu_before, p == 0, p == param.pp - 1) + param.blocks[head_idx: tail_idx + 1], num_npu_before, p == 0, p == pp - 1) # stage 重计算内存、内存碎片 recompute_mem = TinkerCostModel.calc_recompute_mem(param.blocks[head_idx:tail_idx + 1]) @@ -310,28 +312,28 @@ class TinkerCostModel: _ = block.block_time(detail=detail, detail_info=detailed_info) _ = block.block_mem(detail=detail, detail_info=detailed_info) detailed_info.set_and_print(input_comm, output_comm, recompute_mem, reserved_mem_costs[p], mem_cost) - logger.debug('stage %d total Memory: %.3f MB', p, mem_cost) + logger.info('stage %d total Memory: %.3f MB', p, mem_cost) bubble_time = sum(time_costs) profile_args = param.blocks[0].profile_args - micro_batch_num = self.args.global_batch_size // param.cost_model_args['dp'] // profile_args.mbs + micro_batch_num = self.args.global_batch_size // param.search_args.dp // profile_args.mbs time_costs = [bubble_time + (micro_batch_num - 1) * stage_time for stage_time in time_costs] if detail: # 输出时间仿真器细分数据 - logger.debug(f'Time Cost with Bubble'.center(80, '=')) - logger.debug('Sum(unit time): %.3f ms', bubble_time / 1000) + logger.info(f'Time Cost with Bubble'.center(80, '=')) + logger.info('Sum(unit time): %.3f ms', bubble_time / 1000) for time_cost, detail_info in zip(time_costs, detail_infos): detail_info.print_time(bubble_time, micro_batch_num, time_cost) # 参数还原,避免后续影响 self._refresh_blocks(param) return Metrics(time_costs, mem_costs, max(time_costs), max(mem_costs)) - def _read_block_time(self, data_path: str): - """基于profiler,生成searcher参数范围;或者直接基于每个tp sp mbs [ep],去衍化dp pp zero""" + def _read_block_data(self, data_path: str): + """基于profiler,生成searcher参数范围;或者直接基于每个tp sp mbs [ep],去衍化dp pp dist_opt""" for file_path in glob.glob(os.path.join(data_path, '*.csv')): # 获取文件名(不包括后缀) filename_without_suffix = os.path.splitext(os.path.basename(file_path))[0] - # todo 尝试优化此处`p2p`硬编码 + # TODO 尝试优化此处`p2p`硬编码 if "p2p" in filename_without_suffix: continue # 通过文件名获取对应并行策略 @@ -340,7 +342,7 @@ class TinkerCostModel: self._block_data_ready = True def _read_band_time(self, data_path): - # todo 当前p2p.csv的表头在读取数据时无用,且囿于2的幂次,考虑优化 + # TODO 当前p2p.csv的表头在读取数据时无用,且囿于2的幂次,考虑优化 intra_band_file = os.path.join(data_path, "p2p_intra_node.csv") inter_band_file = os.path.join(data_path, "p2p_inter_node.csv") logger.info(intra_band_file) @@ -364,26 +366,6 @@ class TinkerCostModel: self._band_data_ready = True -def convert_to_pp_stage_block_idx(num_layer_list: List[int], num_all_blocks_len: int): - """ - 格式转换 - :param num_layer_list: 一种可能的划分方式, num_layer_list中的元素为每个stage的长度 - :param num_all_blocks_len: 加上头尾blocks的长度 - :return: - """ - interval_layer_list = list() - start_num = 1 - for stage_length in num_layer_list: - interval_layer_list.append((start_num, start_num + stage_length - 1)) - start_num += stage_length - # 处理首尾 - first_tuple = interval_layer_list[0] - interval_layer_list[0] = (0, first_tuple[1]) - last_tuple = interval_layer_list[-1] - interval_layer_list[-1] = (last_tuple[0], num_all_blocks_len - 1) - return interval_layer_list - - def run(args: argparse.Namespace): if args.mode != 'simulate': return @@ -417,7 +399,7 @@ def run(args: argparse.Namespace): raise ValueError("incorrect gbs={}, dp={}, mbs={}, the former must be divided into the latter two.".format( args.global_batch_size, dp, args.micro_batch_size )) - cost_model_args = dict(dp=dp, zero=args.zero, recompute=args.recompute) + cost_model_args = dict(dp=dp, dist_opt=args.dist_opt, recompute=args.recompute) # 当前所有block统一cost_model_args,尤其是recompute for block in pred_blocks: block.update_cost_model_args(cost_model_args) @@ -429,16 +411,17 @@ def run(args: argparse.Namespace): split_way = list(map(int, args.num_layer_list.split(','))) intervals = convert_to_pp_stage_block_idx(split_way, len(pred_blocks)) # 3.5 计算开销,传入detail开关 - task_param = TaskParam(pp=args.simu_pp, cost_model_args=cost_model_args, profiled_args=pred_profiled_args, - blocks=pred_blocks) - strategy = ParallelStrategy(task_param.profiled_args.mbs, task_param.cost_model_args.get('dp'), - args.global_batch_size, task_param.pp, - task_param.profiled_args.tp, task_param.profiled_args.sp, - task_param.profiled_args.ep, task_param.cost_model_args['zero'], - task_param.cost_model_args['recompute'], task_param.profiled_args.is_moe, - args.num_layer_list) + search_args = SearchArgs(pp=args.simu_pp, **cost_model_args, **pred_profiled_args.__dict__) + task_param = TaskParam(search_args=search_args, blocks=pred_blocks) + strategy = ResultArgs( + gbs=args.global_batch_size, + num_layers_list=args.num_layer_list, + blocks=task_param.blocks, + **task_param.search_args.__dict__ + ) metrics = cost_model.calculate_cost(task_param, intervals, args.detail) - print_result(args, (strategy, metrics)) + result_output_handler = ResultOutputHandler(args, cost_model, [(strategy, metrics)]) + result_output_handler.print_and_write_to_file(1, save=False) end_time = time.time() - logger.info(f"[TOTAL TIME] {end_time - start_time} s.") \ No newline at end of file + logger.info(f"[TOTAL TIME] {end_time - start_time} s.") diff --git a/profiler/msprof_analyze/tinker/search/data.py b/profiler/msprof_analyze/tinker/search/data.py index 1d8445657269096e021a8fe01f4f2a79dc9587e3..f367396a272aca2ab0a2b0296e12f43250f76613 100644 --- a/profiler/msprof_analyze/tinker/search/data.py +++ b/profiler/msprof_analyze/tinker/search/data.py @@ -16,33 +16,63 @@ """ search 过程中的数据类 """ -from dataclasses import dataclass -from typing import List, Dict +from dataclasses import dataclass, fields +from typing import List, Optional from tinker.utils.block_args import BlockArgs from tinker.utils.profile_args import ProfileArgs @dataclass(frozen=True) -class ParallelStrategy: +class SearchArgs(ProfileArgs): """ - 完整并行策略参数,用作key + 搜索参数,继承自ProfileArgs + + 属性: + mbs: int - micro batch size,微批次大小 + tp: int - tensor parallel size + sp: int - sequence parallel size + ep: int - expert parallel + dp: Optional[int] - dp值,表示数据并行的副本数 + pp: Optional[int] - pp值,表示pipeline并行的阶段数 + recompute: Optional[int] - 重计算 + dist_opt: Optional[int] - 分布式优化器 """ - mbs: int - dp: int - gbs: int - pp: int - tp: int - sp: int - ep: int - zero: int - rc: int - is_moe: int - # 数字间用逗号隔离,如 1,3,5,7 - num_layers: str + dp: Optional[int] = None # dp值,表示数据并行的副本数 + pp: Optional[int] = None # pp值,表示pipeline并行的阶段数 + recompute: Optional[int] = None # 重计算 + dist_opt: Optional[int] = None # 分布式优化器 + + def __post_init__(self): + '''检查'recompute'和'dist_opt'这两个属性的值是否在{0, 1}中''' + valid_values = {0, 1} + for f in filter(lambda f: f.name in {'recompute', 'dist_opt'}, fields(self)): + if f.name in {'recompute', 'dist_opt'} and getattr(self, f.name) not in valid_values: + raise ValueError(f"Invalid value for {f.name}, expected 0 or 1, got {getattr(self, f.name)}") @dataclass(frozen=True) +class ResultArgs(SearchArgs): + """ + 搜索结果参数,继承自SearchArgs + + 属性: + mbs: int - micro batch size,微批次大小 + tp: int - tensor parallel size + sp: int - sequence parallel size + ep: int - expert parallel + dp: Optional[int] - dp值,表示数据并行的副本数 + pp: Optional[int] - pp值,表示pipeline并行的阶段数 + recompute: Optional[int] - 重计算 + dist_opt: Optional[int] - 分布式优化器 + num_layers_list: str - 表示不同阶段的神经网络层数配置列表 + """ + num_layers_list: Optional[str] = None # 示例: 4,4,4,4,网络层数配置列表,每个元素对应不同阶段的层数 + gbs: Optional[int] = None # 全局的批次大小,表示一次处理的数据量 + blocks: Optional[List[BlockArgs]] = None + + +@dataclass() class Metrics: """ 性能数据 @@ -51,6 +81,7 @@ class Metrics: mem_costs: list time_cost: float mem_cost: float + tokens_per_npu_per_sec: Optional[float] = None @dataclass(frozen=True) @@ -58,9 +89,7 @@ class TaskParam: """ 任务维度的参数,作为非均匀区间划分时的传参 """ - pp: int - cost_model_args: Dict[str, int] - profiled_args: ProfileArgs + search_args: SearchArgs blocks: List[BlockArgs] @@ -72,4 +101,4 @@ class StageData: num_npu_before: int stage_time_max_min: float num_layer_list: list - stage_mem_max: float \ No newline at end of file + stage_mem_max: float diff --git a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py b/profiler/msprof_analyze/tinker/search/optimize.py similarity index 71% rename from profiler/msprof_analyze/tinker/search/gen_modellink_plan.py rename to profiler/msprof_analyze/tinker/search/optimize.py index 1692d5efde01da552f0ad444ba6a539e67bea3d4..1753f5ce3986e7a686c7a8d64a4dbe8399336b0d 100644 --- a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py +++ b/profiler/msprof_analyze/tinker/search/optimize.py @@ -16,7 +16,6 @@ import abc import itertools import logging -import os.path import sys import time import argparse @@ -25,12 +24,12 @@ from multiprocessing import Pool from typing import Iterator, List, Tuple sys.path.append("./") -from tinker.search.data import ParallelStrategy, Metrics, TaskParam, StageData -from tinker.utils.utils import read_file, load_infos, print_result -from tinker.utils.logger import logger, init_log +from tinker.search.data import SearchArgs, ResultArgs, Metrics, TaskParam, StageData from tinker.search.cost_model import TinkerCostModel from tinker.search.arguments import print_args, preprocess_args -from tinker.utils.convert_to_trainsh import convert_to_train_script +from tinker.search.process import ResultOutputHandler +from tinker.utils.utils import read_file, load_infos, convert_to_num_layers +from tinker.utils.logger import logger, init_log from tinker.utils.profile_args import ProfileArgs MAX_FLOAT = 1.0e9 @@ -44,33 +43,25 @@ class Optimizer(abc.ABC): self.user_args = user_args @abc.abstractmethod - def search_parallel_strategies(self) -> List[Tuple[ParallelStrategy, Metrics]]: + def search_parallel_strategies(self) -> List[Tuple[ResultArgs, Metrics]]: pass @abc.abstractmethod - def process(self, strategy_metrics_pairs: List[Tuple[ParallelStrategy, Metrics]]): + def process_result(self, strategy_metrics_pairs: List[Tuple[ResultArgs, Metrics]]): pass def optimize(self): - strategies_to_metrics = self.search_parallel_strategies() - self.process(strategies_to_metrics) + result_pairs = self.search_parallel_strategies() + self.process_result(result_pairs) -class ModelLinkOptimizer(Optimizer): +class TinkerOptimizer(Optimizer): def __init__(self, cost_model: TinkerCostModel, user_args): super().__init__(cost_model, user_args) self.script = self.read_pretrain_file() - @staticmethod - def _convert_to_num_layers(interval_layer_list): - num_layer_list = [interval[1] - interval[0] + 1 for interval in interval_layer_list] - num_layer_list[0] -= 1 - num_layer_list[-1] -= 2 - num_layers = ','.join(map(str, num_layer_list)) - return num_layers - - def search_parallel_strategies(self) -> List[Tuple[ParallelStrategy, Metrics]]: + def search_parallel_strategies(self) -> List[Tuple[ResultArgs, Metrics]]: task_params = self._gen_task_params() strategy_metrics_list = self._parallel_task(task_params) flattened_list = list(itertools.chain(*strategy_metrics_list)) @@ -92,27 +83,18 @@ class ModelLinkOptimizer(Optimizer): logger.info('result will store in %s', self.user_args.config_save_path) return script - def process(self, strategy_metrics_pairs: List[Tuple[ParallelStrategy, Metrics]]): - # step1 相同 time_cost, 不同 mem_cost, 取最小的mem_cost; - if not strategy_metrics_pairs: + def process_result(self, result_pairs: List[Tuple[ResultArgs, Metrics]]): + if not result_pairs: logger.info("no feasible config, exit") return - sorted_by_time = sorted(strategy_metrics_pairs, key=lambda item: (item[1].time_cost, item[1].mem_cost)) - - # step2 转换脚本,取top10 - for config_rank, strategy_metrics_pair in enumerate(sorted_by_time): - print_result(self.user_args, strategy_metrics_pair) - # 1 相同 time_cost,不同 mem_cost,取最小的mem_cost;2 只存 top 10的pretrain脚本 - if config_rank + 1 <= 10: - convert_to_train_script(self.user_args, strategy_metrics_pair, config_rank + 1, self.script) - pass + result_output_handler = ResultOutputHandler(self.user_args, self.cost_model, result_pairs, self.script) - # step3 打印最优结果 - best_strategy = sorted_by_time[0] + # 对于result_pairs进行排序 + result_output_handler.sort() - logger.info('Best: ') - print_result(self.user_args, best_strategy) + # 日志打屏以及写入sh文件, 默认取top 10, 完整结果存入csv + result_output_handler.print_and_write_to_file(10) def _gen_task_params(self): # task 是什么定义? task的目的是为了生成 num_intervals_list,共同组成最终的并行策略 @@ -124,34 +106,45 @@ class ModelLinkOptimizer(Optimizer): # 这里的逻辑应该是除 num_layer_list 之外的所有参数 for profiled_args in profiled_args_list: profiled_args: ProfileArgs - # 计算当前profiled_args下的 pp zero dp 取值范围 + # 计算当前profiled_args下的 pp dist_opt dp 取值范围 num_npus = args.num_npus - # todo 该类约束统一处理 + # TODO 该类约束统一处理 if num_npus % profiled_args.tp: continue # stage变量的搜索空间生成 pp_space = TinkerCostModel.get_pp_range(num_npus, args.num_layers, profiled_args) # type: Iterator - zero_space = [0] if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 else [0, 1] - recompute_space = [0, 1] # todo 支持逐block重计算,当前使用统一full recompute + dist_opt_space = [0] if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 else [0, 1] + recompute_space = [0, 1] # TODO 支持逐block重计算,当前使用统一full recompute # 生成任务队列 - for pp, zero, recompute in itertools.product(pp_space, zero_space, recompute_space): + for pp, dist_opt, recompute in itertools.product(pp_space, dist_opt_space, recompute_space): dp = num_npus // pp // profiled_args.tp local_batch_size = dp * profiled_args.mbs - if args.global_batch_size % local_batch_size or dp == 1 and zero: + if args.global_batch_size % local_batch_size or dp == 1 and dist_opt: continue - cost_model_args = dict(dp=dp, zero=zero, recompute=recompute) + search_args = SearchArgs( + pp=pp, + dp=dp, + recompute=recompute, + dist_opt=dist_opt, + **profiled_args.__dict__ # 继承 profiled_args 的所有属性 + ) blocks = self.cost_model.init_blocks(profiled_args, self.user_args.num_layers) for block in blocks: - block.update_cost_model_args(cost_model_args) + block.update_cost_model_args({ + "dp": dp, + "dist_opt": search_args.dist_opt, + "recompute": search_args.recompute + }) # 头尾处理不做recompute for block in [blocks[0], blocks[-2], blocks[-1]]: block.recompute = False - task_param = TaskParam(pp, cost_model_args, profiled_args, blocks) + + task_param = TaskParam(search_args=search_args, blocks=blocks) task_params.append(task_param) return task_params - def _parallel_task(self, task_params: List[TaskParam]) -> List[ParallelStrategy]: + def _parallel_task(self, task_params: List[TaskParam]): # 寻找最优的几种划分方式 if self.user_args.cpus <= 1: results = [self._memory_and_rounds_search(task_param) for task_param in task_params] @@ -166,7 +159,7 @@ class ModelLinkOptimizer(Optimizer): best_results = [] next_memory_limit = self.user_args.memory_limit # 计算保留内存 - reserved_mems = TinkerCostModel.calc_reserved_mem_costs(task_param.pp, task_param.blocks) + reserved_mems = TinkerCostModel.calc_reserved_mem_costs(task_param.search_args.pp, task_param.blocks) # 动态计算memory_limits while search_round > 0: memory_limits = [next_memory_limit - reserved_mem for reserved_mem in reserved_mems] @@ -174,13 +167,13 @@ class ModelLinkOptimizer(Optimizer): if not interval_layer_list: break - num_layers = self._convert_to_num_layers(interval_layer_list) - strategy = ParallelStrategy(task_param.profiled_args.mbs, task_param.cost_model_args.get('dp'), - self.user_args.global_batch_size, task_param.pp, - task_param.profiled_args.tp, task_param.profiled_args.sp, - task_param.profiled_args.ep, task_param.cost_model_args['zero'], - task_param.cost_model_args['recompute'], task_param.profiled_args.is_moe, - num_layers) + num_layers = convert_to_num_layers(interval_layer_list) + strategy = ResultArgs( + gbs=self.user_args.global_batch_size, + num_layers_list=num_layers, + blocks=task_param.blocks, + **task_param.search_args.__dict__ + ) metrics = self.cost_model.calculate_cost(task_param, interval_layer_list) best_results.append((strategy, metrics)) search_round -= 1 @@ -198,18 +191,19 @@ class ModelLinkOptimizer(Optimizer): """ num_all_blocks = len(param.blocks) profile_args = param.blocks[0].profile_args - micro_batch_num = self.user_args.global_batch_size // param.cost_model_args['dp'] // profile_args.mbs + micro_batch_num = self.user_args.global_batch_size // param.search_args.dp // profile_args.mbs + pp = param.search_args.pp # 头尾处理不流水线切分约束 head_min_num = 1 end_min_num = 2 # dp[i][j] i:block_num,j: stage_idx dp = [[StageData(num_npu_before=0, stage_time_max_min=float('inf'), num_layer_list=list(), stage_mem_max=0)] - * (param.pp + 1) for _ in range(num_all_blocks + 1)] + * (pp + 1) for _ in range(num_all_blocks + 1)] # 动规方程定义:前i个block划分为j个stage的所有方式中,最大time_cost的最小值 dp[0][0] = StageData(num_npu_before=0, stage_time_max_min=0, num_layer_list=list(), stage_mem_max=0) - for j in range(1, param.pp + 1): + for j in range(1, pp + 1): for i in range(1, num_all_blocks + 1): if i <= head_min_num: @@ -219,11 +213,11 @@ class ModelLinkOptimizer(Optimizer): for k in range(i - 1, -1, -1): current_blocks = param.blocks[k: i] # 约束二: - if j == param.pp and len(current_blocks) <= end_min_num: + if j == param.search_args.pp and len(current_blocks) <= end_min_num: continue # 使用j-1,提前固定乘数 - num_fwd_act = TinkerCostModel.get_num_fwd_act(param.pp, j - 1, micro_batch_num) + num_fwd_act = TinkerCostModel.get_num_fwd_act(pp, j - 1, micro_batch_num) current_stage_mem = TinkerCostModel.get_stage_mem_cost(current_blocks, num_fwd_act) # 使用stage对应内存上限判断当前是否可以提前退出 if current_stage_mem >= memory_limits[j - 1]: @@ -232,7 +226,7 @@ class ModelLinkOptimizer(Optimizer): # 计算第j个stage的时间 current_max_status = dp[k][j - 1] num_npu_before, time_cost, _, _ = self.cost_model.get_stage_status( - current_blocks, current_max_status.num_npu_before, j == 1, j == param.pp + current_blocks, current_max_status.num_npu_before, j == 1, j == pp ) # 当前最佳的切分方式 current_max_time_cost = max(dp[k][j - 1].stage_time_max_min, time_cost) @@ -244,14 +238,14 @@ class ModelLinkOptimizer(Optimizer): dp[i][j] = StageData(num_npu_before=num_npu_before, stage_time_max_min=current_max_time_cost, num_layer_list=current_list, stage_mem_max=current_max_mem_cost) - best_result = dp[num_all_blocks][param.pp] + best_result = dp[num_all_blocks][pp] if not best_result.num_layer_list: return None # 根据分割点,计算划分区间 points = best_result.num_layer_list points.append(num_all_blocks) dynamic_stage_intervals = list() - for i in range(param.pp): + for i in range(pp): start_idx = points[i] end_idx = points[i + 1] dynamic_stage_intervals.append((start_idx, end_idx - 1)) @@ -278,7 +272,7 @@ def run(args: argparse.Namespace): initialize(args) # 1. 实例化CostModel cost_model = TinkerCostModel(args) - optimizer = ModelLinkOptimizer(cost_model=cost_model, user_args=args) + optimizer = TinkerOptimizer(cost_model=cost_model, user_args=args) optimizer.optimize() end_time = time.time() - logger.info(f"[TOTAL TIME] {end_time - start_time} s.") \ No newline at end of file + logger.info(f"[TOTAL TIME] {end_time - start_time} s.") diff --git a/profiler/msprof_analyze/tinker/search/process.py b/profiler/msprof_analyze/tinker/search/process.py new file mode 100644 index 0000000000000000000000000000000000000000..dfbc6a3c3e9ed068730989137e100d8cabda3d6a --- /dev/null +++ b/profiler/msprof_analyze/tinker/search/process.py @@ -0,0 +1,365 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import List, Tuple, Dict +from dataclasses import asdict, fields + +from tinker.search.data import ResultArgs, Metrics, TaskParam, SearchArgs +from tinker.utils.utils import extract_between, del_line, write_lines, convert_to_pp_stage_block_idx +from tinker.utils.logger import logger + + +class ResultOutputHandler: + """ + ResultOutputHandler类,用于处理输出结果。 + """ + + def __init__(self, args, cost_model, result_pairs: List[Tuple[ResultArgs, Metrics]], script=None): + self.user_args = args + self.cost_model = cost_model + self.script = script + self.result_pairs = result_pairs + self.result_pairs_sorted = None + self._calculate_tokens() + + + @staticmethod + def _write_strategy_to_file(self, script: str, strategy_param): + """ + 替换用户脚本中的指定参数,若为空,则返回 + :param script: + :param strategy_param: + :return: + """ + + # 直接建两个分类 + params_need_to_deleted = [ + '--tensor-model-parallel-size ', '--micro-batch-size ', '--global-batch-size ', + '--sequence-parallel ', '--use-distributed-optimizer ', '--recompute-method ', + '--recompute-granularity ', '--recompute-num-layers ', '--pipeline-model-parallel-size ', + '--num-layer-list', '--context-parallel-size ', '--context-parallel-algo ', + '--ulysses-degree-in-cp ', '--cp-attention-mask-type ', '--use-cp-send-recv-overlap ', + '--kv-head-repeat-before-uly-alltoall ', '--num-layers-per-virtual-pipeline-stage ', + '--overlap-grad-reduce ', '--overlap-param-gather '] + + params_need_to_append = [f'--tensor-model-parallel-size {strategy_param.tp} \\', + f'--micro-batch-size {strategy_param.mbs} \\', + f'--global-batch-size {strategy_param.gbs} \\', + f'--overlap-grad-reduce \\', + f'--pipeline-model-parallel-size {strategy_param.pp} \\'] + + if strategy_param.pp > 1: + params_need_to_append.append(f'--num-layer-list {strategy_param.num_layers_list} \\') + + if strategy_param.sp: + params_need_to_append.append('--sequence-parallel \\') + + if strategy_param.dist_opt: + params_need_to_append.append('--use-distributed-optimizer \\') + + if strategy_param.recompute: + params_need_to_append.append('--recompute-method uniform \\') + params_need_to_append.append('--recompute-granularity full \\') + params_need_to_append.append('--recompute-num-layers 1 \\') + + # 插入首部: + format_params = [' ' + value for value in params_need_to_append] + + tinker_strategy_params = '\n'.join(format_params) + tinker_search_args_str = 'TINKER_SEARCH_ARGS' + tinker_strategy_params = f"{tinker_search_args_str}=\"\n{tinker_strategy_params}\n\"" + + # 直接删完 + res = del_line(params_need_to_deleted, script) + + # 往首行插入 TINKER_SEARCH_ARGS + res = '\n'.join([tinker_strategy_params, res]) + + # 找到 torchrun xxx .py 或 python xxx.py,以便插入tinker参数 + run_key_words = ['torchrun', 'python'] + hit_key_word = None + + # 先找一圈,找不到直接报错 + for run_key_word in run_key_words: + cmd_content = extract_between(run_key_word, 'py', res) + if cmd_content is not None: + hit_key_word = run_key_word + break + + if hit_key_word is None: + # 可能是空白文件或者没以上 run_key_words 的脚本,直接返回 + return res.splitlines() + + num_skip_line = len(cmd_content.splitlines()) + hit_key_word_idx = -1 + res_lines = res.splitlines() + for idx, line in enumerate(res_lines): + if hit_key_word in line: + hit_key_word_idx = idx + break + + # 在行尾命令行处,加上tinker 的参数 + insert_idx = hit_key_word_idx + num_skip_line + tinker_args_in_cmd = ''.join([' ${', tinker_search_args_str, '} \\']) + res_lines.insert(insert_idx, tinker_args_in_cmd) + return res_lines + + + @staticmethod + def _get_result_dict(self, rank: int, result_pair: Tuple[ResultArgs, Metrics]): + strategy, metric = result_pair + # 提取 info_text 中的指标值到 value_dict + info_values = { + "token_per_npu_per_sec": round(metric.tokens_per_npu_per_sec, 1), + "time_cost": round(metric.time_cost / 1000, 3), + "mem_cost": round(metric.mem_cost, 2) + } + + # 构造完整的 value_dict + value_dict = { + **{ + "rank": rank + 1, + "tp": strategy.tp, + "pp": strategy.pp, + "dp": strategy.dp, + "sp": strategy.sp, + "ep": strategy.ep, + "dist_opt": strategy.dist_opt, + "mbs": strategy.mbs, + "num_layer_list": list(map(int, strategy.num_layers_list.split(','))), + "recompute": strategy.recompute, + }, + **info_values # 合并 info_text 中的指标 + } + + return value_dict + + + def sort(self): + '''排序 + 对于result_pairs,按照tokens_per_npu_per_sec的值从大到小排序, + 如果tokens_per_npu_per_sec的值相同,则根据time_cost的值从小到大排序, + 如果time_cost的值也相同,则根据mem_cost的值从小到大排序 + ''' + self.result_pairs_sorted = sorted( + self.result_pairs, + key=lambda item: ( + -item[1].tokens_per_npu_per_sec, + item[1].time_cost, + item[1].mem_cost + ) + ) + + + def print_and_write_to_file(self, top_num, save=True): + """ + 将结果写入文件并打印日志表格 + :param top_num: 需要打印和写入文件的最优配置数量 + :param save: 是否将结果写入文件 + :return: 无 + :raise ValueError: 如果结果对的有序列表为空,抛出异常 + """ + # 如果不需要保存结果,将结果对的有序列表设置为结果对列表 + if not save: + self.result_pairs_sorted = self.result_pairs + # 如果结果对的有序列表为空,抛出异常 + if not self.result_pairs_sorted: + raise ValueError('Please sort the result first!') + + # 初始化日志表格宽度字典 + table_widths = {} + # 遍历结果对的有序列表,写入csv文件,存入top 到sh文件,更新日志表格宽度用于后续打印top 的配置 + for config_rank, result_pair in enumerate(self.result_pairs_sorted): + # 如果需要保存结果,将结果对写入csv文件 + if save: + self._write_to_csv(result_pair) + # 获取结果对的字典形式 + value_dict = self._get_result_dict(config_rank, result_pair) + if config_rank == 0: + # 初始化日志表格宽度, 以表头的宽度为初始值 + table_widths = {v: len(str(v)) for v in value_dict.keys()} + if config_rank + 1 <= top_num: + # 只存 top 10的pretrain脚本 + if save and config_rank + 1 <= top_num: + self._write_to_sh(result_pair, config_rank + 1, self.script) + # 更新表格宽度 + for k, v in value_dict.items(): + temp_width = table_widths.get(k) + table_widths[k] = max(temp_width, len(str(v))) + + # 打印日志表格,呈现top top_num配置 + if save: + self._print_table(table_widths, self.result_pairs_sorted[:top_num], f"top {top_num} configs", True) + else: + self._print_table(table_widths, self.result_pairs_sorted, "simulate config", False) + + # simulate模式下无需打印最优配置 + if save: + # 打印最优配置 + best_strategy = self.result_pairs_sorted[0] + self._print_table(table_widths, [best_strategy], "Best config", False) + + + def _calculate_tokens(self): + for result_pair in self.result_pairs: + _, metric = result_pair + tokens_per_npu_per_sec = ( + self.user_args.seq_length * self.user_args.global_batch_size / + self.user_args.num_npus / + metric.time_cost * 1000000 + ) + metric.tokens_per_npu_per_sec = tokens_per_npu_per_sec + + + def _write_to_csv(self, result_pair: Tuple[ResultArgs, Metrics]): + """ + 将某result_pair写入csv文件 + + 参数: + result_pair: 一个元组,包含ResultArgs和Metrics两个对象 + + 返回值: + 无 + + 异常描述: + 如果写入文件失败,会抛出RuntimeError异常 + """ + # 从result_pair中提取strategy和metric对象 + strategy, metric = result_pair + + # 提取属性字典 + strategy_dict = asdict(strategy) + # 过滤掉不需要的属性 + filtered_strategy = { + k: v + for k, v in strategy_dict.items() + if k not in {"algo", "model", "gbs", "blocks"} + } + # 提取metric对象的属性字典 + metric_dict = asdict(metric) + + # 拼接表头(字段名)和字段值 + header = list(filtered_strategy.keys()) + list(metric_dict.keys()) + # 将strategy的属性值转换为字符串 + strategy_values = [] + for k, v in filtered_strategy.items(): + if k == "num_layers_list": + strategy_values.append(f"[{str(v)}]") + else: + strategy_values.append(str(v)) + # 初始化metric的属性值列表 + metric_values = [] + # 遍历metric的属性字典 + for k, v in metric_dict.items(): + # 如果属性名是"tokens_per_npu_per_sec",则格式化为保留一位小数的字符串 + if k == "tokens_per_npu_per_sec": + metric_values.append(f"{v:.1f}") + # 如果属性值是浮点数,则格式化为保留三位小数的字符串 + elif isinstance(v, float): + metric_values.append(f"{v / 1000:.3f}") + # 如果属性值是列表或元组,则将每个元素格式化为保留三位小数的字符串,并用逗号连接 + elif isinstance(v, (list, tuple)): + metric_values.append('[' + ','.join([f"{x / 1000:.3f}" for x in v]) + ']') + # 将strategy和metric的属性值拼接成一行数据 + line = ','.join(strategy_values + metric_values) + '\n' + + # 定义文件路径 + file_path = f"{self.user_args.log_path}/results.csv" + try: + # 检查文件是否存在 + if not os.path.isfile(file_path): + # 若文件不存在,先写入表头 + with open(file_path, 'w', encoding='utf-8', newline='') as file: + file.write(','.join(header) + '\n') + file.write(line) + else: + # 追加写入数据行 + with open(file_path, 'a', encoding='utf-8', newline='') as file: + file.write(line) + except Exception as e: + # 如果写入文件失败,抛出异常 + raise RuntimeError(f"写入 results.csv 失败") from e + + + def _write_to_sh(self, result_pair, rank, script): + """ + 将tinker并行策略嵌入用户预训练脚本,若没有,则仅生成一个 + :param args: 用户参数 + :param config: tinker搜出的配置 + :param config_rank: 配置排序(按时间) + :param pretrain_script: 用户的预训练脚本 + :return: + """ + # 筛出并行策略参数 + strategy, metric = result_pair + # 格式化输出文件名 + info_text = f'time{metric.time_cost / 1000:.3f}_mem{metric.mem_cost:.2f}' + split_params = strategy.num_layers_list.replace(',', '_') + trainsh_path = ( + f"{self.user_args.config_save_path}/{self.user_args.model_name}-{self.user_args.model_size}-rank{rank}" + f"_seq{self.user_args.seq_length}_tp{strategy.tp}_pp{strategy.pp}_sp{strategy.sp}" + f"_distopt{strategy.dist_opt}_mbs{strategy.mbs}_gbs{strategy.gbs}_L{split_params}_rc{strategy.recompute}" + f"_{info_text}.sh") + + script_content = self._write_strategy_to_file(script, strategy) + + # 写文件 + write_lines(script_content, trainsh_path) + + + def _print_table( + self, + table_widths: Dict[str, int], + result_pairs: List[Tuple[ResultArgs, Metrics]], + title: str, + save: bool + ): + """ + 打印日志表格 + :param table_widths: 字典,键为表头名称,值为对应列的宽度 + :param result_pairs: 结果对列表,每个元素为一个元组,包含ResultArgs和Metrics对象 + :param title: 表格标题 + """ + # 打印日志表格 + logger.info('=' * 40 + title + '=' * 40) + # 获取表头 + headers = list(table_widths.keys()) + # 根据每列宽度生成格式化字符串 + formatter = '|'.join([f'{{:<{width}}}' for width in table_widths.values()]) + # 打印分隔线(根据每列宽度生成 '-' 字符串) + sep_line = '·'.join(['—' * width for width in table_widths.values()]) + # 首行分割线,特殊处理 + logger.info('·' + sep_line + '·') + # 打印表头 + logger.info('|' + formatter.format(*headers) + '|') + logger.info('·' + sep_line + '·') + # 遍历结果对列表 + for config_rank, result_pair in enumerate(result_pairs): + # 获取结果字典 + value_dict = self._get_result_dict(config_rank, result_pair) + # 根据表头获取每行的值 + row_values = [str(value_dict[col]) for col in headers] + # 打印每行数据 + logger.info('|' + formatter.format(*row_values) + '|') + if save and self.user_args.detail: + logger.info('·' + sep_line + '·') + # TODO:中间结果呈现 + search_args = SearchArgs(**{f.name: getattr(result_pair[0], f.name) for f in fields(SearchArgs)}) + task_param = TaskParam(search_args, result_pair[0].blocks) + num_layer_list = list(map(int, result_pair[0].num_layers_list.split(','))) + intervals = convert_to_pp_stage_block_idx(num_layer_list, sum(num_layer_list) + 3) + self.cost_model.calculate_cost(task_param, intervals, True) + logger.info('·' + sep_line + '·') diff --git a/profiler/msprof_analyze/tinker/tinker_auto_parallel.py b/profiler/msprof_analyze/tinker/tinker_auto_parallel.py index 34eca2b1157478bd39cc74759021297efb003430..e52cf58cce259963246ff3553c550c9b302a8fb1 100644 --- a/profiler/msprof_analyze/tinker/tinker_auto_parallel.py +++ b/profiler/msprof_analyze/tinker/tinker_auto_parallel.py @@ -22,7 +22,7 @@ if tinker_parent_dir not in sys.path: sys.path.append(str(tinker_parent_dir)) from tinker.profiler import profile_space -from tinker.search import gen_modellink_plan, cost_model +from tinker.search import optimize, cost_model from tinker.utils.config import parse_args, check_args @@ -31,7 +31,7 @@ def main(): args = parse_args() check_args(args) profile_space.run(args) - gen_modellink_plan.run(args) + optimize.run(args) cost_model.run(args) diff --git a/profiler/msprof_analyze/tinker/utils/block_args.py b/profiler/msprof_analyze/tinker/utils/block_args.py index 243326f672bb47a1dc0b4a78d995c44ed8f30e5e..ff232f82a67e56188e8c8f1d6200a898dc556adf 100644 --- a/profiler/msprof_analyze/tinker/utils/block_args.py +++ b/profiler/msprof_analyze/tinker/utils/block_args.py @@ -40,11 +40,11 @@ class DetailedInfo: optimizer_state: float = 0.0 inputs: float = 0.0 activation: float = 0.0 - zero_slice: float = 0.0 + dist_opt_slice: float = 0.0 recompute: float = 0.0 reserved_mem: float = 0.0 attention_mask_mem: float = 0.0 - dp_zero: int = 1 + dp_dist_opt: int = 1 num_fwd_act: int = 1 block_weight: List[float] = None block_act: List[float] = None @@ -60,47 +60,47 @@ class DetailedInfo: def print_info(self): self._round_3() # 各个成分 - logger.debug('Time Cost'.center(60, '-')) - logger.debug(f'block forward time(us): {self.block_fwd}') - logger.debug(f'block backward time with recompute(us): {self.block_bwd}') - logger.debug(f'forward time = {self.fwd / 1000:.3f} ms') - logger.debug(f'backward time = {self.bwd / 1000:.3f} ms') - - logger.debug('Memory Cost'.center(60, '-')) - model_optimizer_mem = self.weight + self.grad + self.weight_bf16 + self.full_precision_weight / self.dp_zero - logger.debug( + logger.info('Time Cost'.center(60, '-')) + logger.info(f'block forward time(us): {self.block_fwd}') + logger.info(f'block backward time with recompute(us): {self.block_bwd}') + logger.info(f'forward time = {self.fwd / 1000:.3f} ms') + logger.info(f'backward time = {self.bwd / 1000:.3f} ms') + + logger.info('Memory Cost'.center(60, '-')) + model_optimizer_mem = self.weight + self.grad + self.weight_bf16 + self.full_precision_weight / self.dp_dist_opt + logger.info( f'model & optimizer({model_optimizer_mem:.3f})' f' = {self._v("weight")} + {self._v("grad")} + {self._v("weight_bf16")}' - f' + {self._v("full_precision_weight")} / {self._v("dp_zero")}' + f' + {self._v("full_precision_weight")} / {self._v("dp_dist_opt")}' ) - logger.debug(f'block weights({self.block_weight})') - logger.debug(f'block activations({self.block_act})') - logger.debug(f'first time block activations({self.first_time_block_act})') + logger.info(f'block weights({self.block_weight})') + logger.info(f'block activations({self.block_act})') + logger.info(f'first time block activations({self.first_time_block_act})') def print_time(self, bubble_time, micro_batch_num, time_cost): unit_time = (self.fwd + self.bwd + self.input_comm + self.output_comm) / 1000 bubble_time = bubble_time / 1000 - unit_time - logger.debug(f'Unit Time({unit_time:.3f} ms)' + logger.info(f'Unit Time({unit_time:.3f} ms)' f' = {self._v("fwd")} + {self._v("bwd")} + {self._v("input_comm")} + {self._v("output_comm")}') - logger.debug(f'Time({time_cost / 1000:.3f})' + logger.info(f'Time({time_cost / 1000:.3f})' f' = bubble({bubble_time:.3f}) + mbn({micro_batch_num}) * unit_time({unit_time:.3f})') def print_mem_calc(self, mem_cost): self._round_3() # pipeline_fwd_act计算 - logger.debug( + logger.info( f'{self._v("pipeline_fwd_act")} = ' f'{self._v("num_fwd_act")}' f' * [{self._v("inputs")} + {self._v("activation")}]' ) - logger.debug( + logger.info( f'Memory({mem_cost:.3f})' f' = {self._v("weight")} + {self._v("grad")} + {self._v("weight_bf16")}' f' + [{self._v("full_precision_weight")} + {self._v("optimizer_state")}]' - f' / {self._v("dp_zero")}' + f' / {self._v("dp_dist_opt")}' f' + {self._v("pipeline_fwd_act")}' f' + {self._v("attention_mask_mem")}' - f' + {self._v("recompute")} + {self._v("reserved_mem")})' + f' + {self._v("recompute")} + {self._v("reserved_mem")}' ) def set_and_print(self, input_comm, output_comm, recompute_mem, reserved_mem_cost, mem_cost): @@ -124,13 +124,13 @@ class BlockArgs: """存block这一层级 所关注的训练优化策略,协同 ProfileArgs 参数,以及 BlockCost 数据下 去支撑 CostModel 中的一些计算""" def __init__(self, args, profile_args: ProfileArgs, block_cost: 'BlockCost'): - # todo dp zero往这里移 + # TODO dp dist_opt往这里移 self.profile_args = profile_args self.data = block_cost self.num_fwd_act = None self.recompute = None self.dp = None - self.zero = None + self.dist_opt = None self.is_first = False self.attention_mask_mem = 0.0 # 兼容老版本没存数据类型信息 @@ -147,14 +147,14 @@ class BlockArgs: @property def num_npu_block(self): """返回这个block涉及的NPU个数,通常一个stage中的block返回值都相等,所以调一个block的值就行""" - # todo 后面要把dp挪成自有属性 + # TODO 后面要把dp挪成自有属性 return self.profile_args.tp * self.dp def update_cost_model_args(self, cost_model_args: Dict[str, int]): for k, v in cost_model_args.items(): setattr(self, k, v) - # todo 全都是个入参,考虑把这些函数放到cost model里 + # TODO 全都是个入参,考虑把这些函数放到cost model里 def block_time(self, detail=False, detail_info: DetailedInfo = None) -> float: """前向 + 反向 + 重计算 + p2p通信 = fwd + bwd + rec_fwd + in_comm + out_comm""" compute_time = self.data.fwd * (1 + self.recompute) + self.data.bwd @@ -168,7 +168,7 @@ class BlockArgs: def block_mem(self, detail=False, detail_info: DetailedInfo = None) -> float: """ 权重 + 梯度 + 优化器 + 激活值 - = (1 + PM +(1 + PO) / dp_zero) * w + (SB + 1) * (is_first * input + is_recompute * act) + = (1 + PM +(1 + PO) / dp_dist_opt) * w + (SB + 1) * (is_first * input + is_recompute * act) :return: """ full_precision_weight = self.data.param_master * self.data.w @@ -179,16 +179,16 @@ class BlockArgs: input_mem = self.is_first * self.data.in_size activation_mem = self.data.in_size if self.recompute else self.data.act - dp_zero = self.dp if self.zero else 1 + dp_dist_opt = self.dp if self.dist_opt else 1 # memory 不同部分的计算 - # 初始: (2 + bf16 + 2 / dp_zero) * W - # step0结束: (4 / dp_zero) * W + # 初始: (2 + bf16 + 2 / dp_dist_opt) * W + # step0结束: (4 / dp_dist_opt) * W # 前反向: pipeline_fwd_act * A + attention_mask # 仅 transformer block 生成一次的 attention_mask mem = 0 - mem += weight_mem + grad_mem + weight_bf16_mem + full_precision_weight / dp_zero - mem += optimizer_mem / dp_zero + mem += weight_mem + grad_mem + weight_bf16_mem + full_precision_weight / dp_dist_opt + mem += optimizer_mem / dp_dist_opt mem += self.num_fwd_act * (input_mem + activation_mem) mem += self.attention_mask_mem if detail: @@ -203,9 +203,9 @@ class BlockArgs: detail_info.activation += activation_mem detail_info.block_act.append(activation_mem) detail_info.first_time_block_act.append(activation_mem + self.attention_mask_mem) - detail_info.zero_slice += (grad_mem + optimizer_mem) / dp_zero + detail_info.dist_opt_slice += (grad_mem + optimizer_mem) / dp_dist_opt detail_info.attention_mask_mem += self.attention_mask_mem - detail_info.dp_zero = dp_zero + detail_info.dp_dist_opt = dp_dist_opt detail_info.num_fwd_act = self.num_fwd_act return mem @@ -220,6 +220,6 @@ class BlockCost: act: float fwd_reserved: float bwd_reserved: float - # todo 这2个变量 必给扔到别的地方 + # TODO 这2个变量 必给扔到别的地方 param_master: int = 2 - param_optimizer: int = 4 \ No newline at end of file + param_optimizer: int = 4 diff --git a/profiler/msprof_analyze/tinker/utils/config.py b/profiler/msprof_analyze/tinker/utils/config.py index 081dad71d8823f6d11b721bc48a6f4538c083987..3d958c4dde8bd2cf1dbc0dfa72d9a8c491763504 100644 --- a/profiler/msprof_analyze/tinker/utils/config.py +++ b/profiler/msprof_analyze/tinker/utils/config.py @@ -148,8 +148,8 @@ def add_simulate_args(parser: argparse.ArgumentParser): help='expert parallel') simulate_group.add_argument('--simu_sp', type=int, default=get_simulate_arg("simu_sp"), help='sequence parallel') - simulate_group.add_argument('--zero', type=int, default=get_simulate_arg("zero"), - help='mode of zero', choices=[0, 1]) + simulate_group.add_argument('--dist_opt', type=int, default=get_simulate_arg("dist_opt"), + help='mode of dist_opt', choices=[0, 1]) simulate_group.add_argument('-mbs', '--micro_batch_size', type=int, default=get_simulate_arg("micro_batch_size"), help='micro batch size') simulate_group.add_argument('--num_layer_list', type=str, default=get_simulate_arg("num_layer_list"), diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py index 77f1817595d4e04a4c697916f59391818a94fd4d..f29057ae1dbe9f5cdcf2b1f091de53b6f2b2ebff 100644 --- a/profiler/msprof_analyze/tinker/utils/constant.py +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -83,7 +83,7 @@ class SimulateParameter: simu_pp: int = 1 simu_ep: int = 1 simu_sp: int = 0 - zero: int = 0 + dist_opt: int = 0 micro_batch_size: int = 1 num_layer_list: list = None recompute: int = 0 diff --git a/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py b/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py deleted file mode 100644 index 1d646e2882201c0faa187a86a3437e9818e8b7c3..0000000000000000000000000000000000000000 --- a/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright (c) 2025, Huawei Technologies Co., Ltd. -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# 定义原始脚本内容 -from collections import namedtuple -from typing import Tuple - -from tinker.search.data import Metrics, ParallelStrategy -from tinker.utils.utils import extract_between, del_line, write_lines - - -def convert_to_train_script(args, config: Tuple[ParallelStrategy, Metrics], config_rank: int, pretrain_script): - """ - 将tinker并行策略嵌入用户预训练脚本,若没有,则仅生成一个 - :param args: 用户参数 - :param config: tinker搜出的配置 - :param config_rank: 配置排序(按时间) - :param pretrain_script: 用户的预训练脚本 - :return: - """ - # 筛出并行策略参数 - strategy, metric = config - # 格式化输出文件名 - info_text = f'time{metric.time_cost / 1000:.3f}_mem{metric.mem_cost:.2f}' - split_params = strategy.num_layers.replace(',', '_') - trainsh_path = (f"{args.config_save_path}/{args.model_name}-{args.model_size}-rank{config_rank}" - f"_seq{args.seq_length}_tp{strategy.tp}_pp{strategy.pp}_sp{strategy.sp}" - f"_distopt{strategy.zero}_mbs{strategy.mbs}_gbs{strategy.gbs}_L{split_params}_rc{strategy.rc}" - f"_{info_text}.sh") - - script_content = write_strategy_to_file(pretrain_script, strategy) - - # 写文件 - write_lines(script_content, trainsh_path) - - -def write_strategy_to_file(script: str, strategy_param): - """ - 替换用户脚本中的指定参数,若为空,则返回 - :param script: - :param strategy_param: - :return: - """ - - # 直接建两个分类 - params_need_to_deleted = ['--tensor-model-parallel-size ', '--micro-batch-size ', '--global-batch-size ', - '--sequence-parallel ', '--use-distributed-optimizer ', '--recompute-method ', - '--recompute-granularity ', '--recompute-num-layers ', '--pipeline-model-parallel-size ', - '--num-layer-list', '--context-parallel-size ', '--context-parallel-algo ', - '--ulysses-degree-in-cp ', '--cp-attention-mask-type ', '--use-cp-send-recv-overlap ', - '--kv-head-repeat-before-uly-alltoall ', '--num-layers-per-virtual-pipeline-stage ', - '--overlap-grad-reduce ', '--overlap-param-gather '] - - params_need_to_append = [f'--tensor-model-parallel-size {strategy_param.tp} \\', - f'--micro-batch-size {strategy_param.mbs} \\', - f'--global-batch-size {strategy_param.gbs} \\', - f'--overlap-grad-reduce \\', - f'--pipeline-model-parallel-size {strategy_param.pp} \\'] - - if strategy_param.pp > 1: - params_need_to_append.append(f'--num-layer-list {strategy_param.num_layers} \\') - - if strategy_param.sp: - params_need_to_append.append('--sequence-parallel \\') - - if strategy_param.zero: - params_need_to_append.append('--use-distributed-optimizer \\') - - if strategy_param.rc: - params_need_to_append.append('--recompute-method uniform \\') - params_need_to_append.append('--recompute-granularity full \\') - params_need_to_append.append('--recompute-num-layers 1 \\') - - # 插入首部: - format_params = [' ' + value for value in params_need_to_append] - - tinker_strategy_params = '\n'.join(format_params) - tinker_search_args_str = 'TINKER_SEARCH_ARGS' - tinker_strategy_params = f"{tinker_search_args_str}=\"\n{tinker_strategy_params}\n\"" - - # 直接删完 - res = del_line(params_need_to_deleted, script) - - # 往首行插入 TINKER_SEARCH_ARGS - res = '\n'.join([tinker_strategy_params, res]) - - # 找到 torchrun xxx .py 或 python xxx.py,以便插入tinker参数 - run_key_words = ['torchrun', 'python'] - hit_key_word = None - - # 先找一圈,找不到直接报错 - for run_key_word in run_key_words: - cmd_content = extract_between(run_key_word, 'py', res) - if cmd_content is not None: - hit_key_word = run_key_word - break - - if hit_key_word is None: - # 可能是空白文件或者没以上 run_key_words 的脚本,直接返回 - return res.splitlines() - - num_skip_line = len(cmd_content.splitlines()) - hit_key_word_idx = -1 - res_lines = res.splitlines() - for idx, line in enumerate(res_lines): - if hit_key_word in line: - hit_key_word_idx = idx - break - - # 在行尾命令行处,加上tinker 的参数 - insert_idx = hit_key_word_idx + num_skip_line - tinker_args_in_cmd = ''.join([' ${', tinker_search_args_str, '} \\']) - res_lines.insert(insert_idx, tinker_args_in_cmd) - return res_lines \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/profile_args.py b/profiler/msprof_analyze/tinker/utils/profile_args.py index 2b51a63c39ef81c6be6f8dd90f9aa248b936c794..58db8a9af179d104a6d6aada899942a90d594a85 100644 --- a/profiler/msprof_analyze/tinker/utils/profile_args.py +++ b/profiler/msprof_analyze/tinker/utils/profile_args.py @@ -15,15 +15,25 @@ import re from dataclasses import dataclass, fields +from typing import Optional from tinker.profiler.profile_space import ScriptArgs @dataclass(frozen=True) class ProfileArgs(ScriptArgs): + """ + 测量属性,继承自ScriptArgs + + 属性: + mbs: int - micro barch size + tp: int - tensor parallel size + sp: int - sequence parallel size + ep: int - expert parallel + """ mbs: int = 1 algo: int = 0 - model: str = None + model: Optional[str] = None @property def hint(self): diff --git a/profiler/msprof_analyze/tinker/utils/utils.py b/profiler/msprof_analyze/tinker/utils/utils.py index 853f88da1984688c11f85d51e453d102bf7f8983..e06883b7a33b60eef511ccd619846fc707c17e15 100644 --- a/profiler/msprof_analyze/tinker/utils/utils.py +++ b/profiler/msprof_analyze/tinker/utils/utils.py @@ -163,26 +163,6 @@ def write_lines(final_res: list, dest_file: str): raise RuntimeError(f'write to file: {dest_file} failed.') from e -def print_result(user_args, config_result_pair): - strategy, metric = config_result_pair - time_costs = [round(time_cost / 1000, 3) for time_cost in metric.time_costs] - mem_costs = [round(mem_cost, 2) for mem_cost in metric.mem_costs] - tokens_per_npu_per_sec = (user_args.seq_length * user_args.global_batch_size / user_args.num_npus / - metric.time_cost * 1000000) - info_text = (f'token/npu/sec = {tokens_per_npu_per_sec:.1f}, time_cost = {metric.time_cost / 1000:.3f}, ' - f'mem_cost = {metric.mem_cost:.2f}, ') - - value_dict = dict(tp=strategy.tp, pp=strategy.pp, dp=strategy.dp, sp=strategy.sp, ep=strategy.ep, - zero=strategy.zero, mbs=strategy.mbs, - num_layer_list=list(map(int, strategy.num_layers.split(','))), rc=strategy.rc, - time_costs=time_costs, mem_costs=mem_costs) - # 先加后删 - if not strategy.is_moe: - del value_dict['ep'] - info_text += ', '.join([f'{k}={v}' for k, v in value_dict.items()]) - logger.info(info_text) - - def load_infos(args): """读取前序流程保存的模型结构等信息,并保存到全局变量args中""" model_info = find_files(args.profiled_data_path, 'model_info*.json') @@ -342,4 +322,32 @@ def check_files_in_dir(path: str): """校验目录下存在文件""" if os.path.isdir(path) and len(os.listdir(path)) == 0: logger.error(f'No files in {path}') - raise Exception \ No newline at end of file + raise Exception + + +def convert_to_pp_stage_block_idx(num_layer_list: List[int], num_all_blocks_len: int): + """ + 格式转换 + :param num_layer_list: 一种可能的划分方式, num_layer_list中的元素为每个stage的长度 + :param num_all_blocks_len: 加上头尾blocks的长度 + :return: + """ + interval_layer_list = list() + start_num = 1 + for stage_length in num_layer_list: + interval_layer_list.append((start_num, start_num + stage_length - 1)) + start_num += stage_length + # 处理首尾 + first_tuple = interval_layer_list[0] + interval_layer_list[0] = (0, first_tuple[1]) + last_tuple = interval_layer_list[-1] + interval_layer_list[-1] = (last_tuple[0], num_all_blocks_len - 1) + return interval_layer_list + + +def convert_to_num_layers(interval_layer_list): + num_layer_list = [interval[1] - interval[0] + 1 for interval in interval_layer_list] + num_layer_list[0] -= 1 + num_layer_list[-1] -= 2 + num_layers = ','.join(map(str, num_layer_list)) + return num_layers