From f6bea3d15c4e7fd7243641322db279977dc81464 Mon Sep 17 00:00:00 2001 From: zhangning Date: Sat, 23 Aug 2025 16:46:40 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E6=8B=89=E5=8F=96=E6=9C=80=E6=96=B0?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E8=A7=A3=E5=86=B3=E5=86=B2=E7=AA=81?= =?UTF-8?q?=EF=BC=8C=E9=87=8D=E6=96=B0=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework_adapter/modellink_adapter.py | 33 +++- .../tinker/megatron_patch/arguments.py | 29 +++- .../tinker/megatron_patch/microbatches.py | 10 ++ .../megatron_patch/modellink_version.py | 12 +- .../tinker/model/adapter_utils.py | 42 ++++- .../tinker/model/block_adapters.py | 20 ++- .../tinker/model/block_infos.py | 1 + .../tinker/profiler/block_profiler.py | 138 ++++++++++----- .../tinker/profiler/gen_model_structure.py | 4 +- .../msprof_analyze/tinker/profiler/profile.sh | 125 +++++--------- .../tinker/profiler/profile_space.py | 157 +++++++++++++++--- .../tinker/search/cost_model.py | 71 +++++--- .../msprof_analyze/tinker/utils/block_args.py | 5 +- .../msprof_analyze/tinker/utils/config.py | 17 +- .../msprof_analyze/tinker/utils/constant.py | 30 ++-- profiler/msprof_analyze/tinker/utils/utils.py | 31 +++- profiler/msprof_analyze/tinker/version.py | 10 +- 17 files changed, 502 insertions(+), 233 deletions(-) diff --git a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py index 2446a4aee..81686566d 100644 --- a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py +++ b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py @@ -23,10 +23,11 @@ get_model: 用于生成单层模型实例 import os from abc import ABC, abstractmethod -from typing import Dict, Tuple, Type, Union +from typing import Dict, Type import torch +from tinker.utils.constant import Version, version_parse class ModelLinkAdapter(ABC): @abstractmethod @@ -123,7 +124,8 @@ class ModelLinkAdapter11(ModelLinkAdapter): def pre_time_profile_backward_step(self): from megatron.core.pipeline_parallel import schedules from tinker.megatron_patch.schedules import custom_backward - self.original_custom_backward = schedules.custom_backward + if self.original_custom_backward is None: + self.original_custom_backward = schedules.custom_backward schedules.custom_backward = custom_backward def pre_mem_profile_backward_step(self): @@ -141,10 +143,13 @@ class ModelLinkAdapter11(ModelLinkAdapter): def get_model(self): from megatron.core.enums import ModelType + from megatron.training import get_model + from megatron.training.utils import unwrap_model from pretrain_gpt import model_provider args = self.get_args() args.model_type = ModelType.encoder_or_decoder - return model_provider() + model = get_model(model_provider, args.model_type)[0] + return unwrap_model(model) def deallocate_output_tensor(self, outputs, deallocate_outputs): from megatron.core.pipeline_parallel import schedules @@ -199,10 +204,21 @@ class ModelLinkAdapter100(ModelLinkAdapter11): """ 1.0.0相关接口 """ - + def __init__(self): + super.__init__() + self.original_autograd = None def initialize(self): from modellink.training.initialize import initialize_megatron initialize_megatron() + def pre_time_profile_backward_step(self): + from megatron.core.pipeline_parallel import schedules + from tinker.megtron_patch.schedules import custom_backward + if self.original_custom_backward is None: + self.original_custom_backward = schedules.custom_backward + schedules.custom_backward = custom_backward + def pre_mem_profile_backward_step(self): + from megatron.core.pipeline_parallel import schedules + schedules.custom_backward = self.original_custom_backward class ModelLinkAdapter200(ModelLinkAdapter100): @@ -212,6 +228,14 @@ class ModelLinkAdapter200(ModelLinkAdapter100): pass +version_map: Dict[str, Type[ModelLinkAdapter]] = { + '1.0': ModelLinkAdapter10, + '1.1': ModelLinkAdapter11, + '1.2': ModelLinkAdapter12, + '1.0.0': ModelLinkAdapter100, + '2.0.0': ModelLinkAdapter200 +} + class ModelLinkAdapterTune200(ModelLinkAdapter100): """ 2.0.0 tune 相关接口相较1.0.0 输入数据处理变更了 @@ -233,7 +257,6 @@ version_map: Dict[str, Type[ModelLinkAdapter]] = { '1.0.0': ModelLinkAdapter100, '2.0.0': ModelLinkAdapter200 } - tune_version_map: Dict[str, Type[ModelLinkAdapter]] = { '2.0.0': ModelLinkAdapterTune200 } diff --git a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py index 5ed70cebd..6ea718018 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py @@ -17,12 +17,15 @@ from functools import wraps, partial from typing import Callable, Optional STORE_TRUE = 'store_true' -NUM_LAYERS = None # type: Optional[int] +NUM_LAYERS = 0 +NUM_DENSE = 0 def get_num_layers(): return NUM_LAYERS +def get_dense(): + return NUM_DENSE def extra_args_provider_decorator(extra_args_provider): @wraps(extra_args_provider) @@ -40,10 +43,22 @@ def parse_args_decorator(parse_args): def wrapper(extra_args_provider=None, ignore_unknown_args=False): decorated_provider = extra_args_provider_decorator(extra_args_provider) args = parse_args(decorated_provider, ignore_unknown_args) - # 提取layers 然后置1 - global NUM_LAYERS - NUM_LAYERS = args.num_layers + # layers相关置1操作 + global NUM_LAYERS, NUM_DENSE + #noop分析 + if hasattr(args, 'noop_layers'): + noop = args.noop_layers + if noop is not None: + NUM_LAYERS -= len(noop.split(',')) + args.noop_layers = None + NUM_LAYERS += int(args.num_layers) args.num_layers = 1 + if hasattr(args, 'first_k_dense_replace'): + num_dense = args.first_k_dense_replace + if num_dense is not None and num_dense > 0: + NUM_DENSE = int(num_dense) + args.first_k_dense_replace = 1 + args.num_layers = 2 return args return wrapper @@ -84,13 +99,11 @@ def profile_args_wrapper(fn: Callable): def override_profile_args(args): args.data_parallel_size = args.world_size // (args.pipeline_model_parallel_size * args.tensor_model_parallel_size * args.context_parallel_size) + if args.topk_group is not None and args.topk_group > args.expert_model_parallel_size: + args.topk_group = args.expert_model_parallel_size args.global_batch_size = args.data_parallel_size # 此处仅用于通过validation args.micro_batch_size = 1 - args.num_ops_in_each_stage = [1] args.virtual_pipeline_model_parallel_size = 1 - args.model_parallel_size_of_each_op = [[args.tensor_model_parallel_size]] - args.data_parallel_size_of_each_op = [[1]] args.model_name = "" - args.resharding_stages = [True] return args diff --git a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py index 76f40816d..0d65486e1 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py @@ -58,4 +58,14 @@ def _build_num_microbatches_calculator(args): def rebuild_num_microbatches_calculator(): args = get_args() + try: + from megtron.core.num_microbatches_calculator import reconfigure_micro_batch_calculator + reconfigure_micro_batch_calculator(rank=args.rank, + rampup_batch_size=None, + global_batch_size = args.global_batch_size, + micro_batch_size = args.micro_batch_size, + data_parallel_size = args.data_parallel_size) + return + except ImportError: + pass _build_num_microbatches_calculator(args) diff --git a/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py b/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py index ec93d4a49..82adef578 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py @@ -16,6 +16,7 @@ import os import sys +from tinker.model.adapter_utils import error_free_import from tinker.utils.logger import logger @@ -26,15 +27,8 @@ def modellink_import(): raise RuntimeError("ML_PATH is not set") sys.path.append(modellink_path) try: - if modellink_version == "1.0": - from ascendspeed import megatron_adaptor - if modellink_version == "1.0.0" or modellink_version == "2.0.0": - import mindspeed_llm - sys.modules['modellink'] = sys.modules['mindspeed_llm'] - else: - import modellink - import megatron - except ModuleNotFoundError as e: + error_free_import() + except Exception as e: raise RuntimeError("ML_PATH is not available. Please make sure it is set correctly!") from e from tinker.megatron_patch.patch import patch logger.info(f'modellink path {modellink_path}') diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index fb7c8cd7c..cea4f19fa 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -19,18 +19,36 @@ import inspect import os import sys import textwrap +from ctypes.macholib.framework import framework_info from typing import List, Tuple, Dict import astor -from tinker.model.block_adapters import BlockAdapter, legacy_block_adapters, mcore_block_adapters +from tinker.model.block_adapters import BlockAdapter, legacy_block_adapters, mcore_block_adapters, \ + mcore_block_adapter_1_1_rc1 from tinker.utils.config import TINKER_DIR +from tinker.utils.constant import Version from tinker.utils.logger import logger +from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package, \ + check_path_exit + +from profiler.msprof_analyze.setup import version +from profiler.msprof_analyze.tinker.model.block_infos import get_block_adapters from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package from tinker.utils.constant import MODULE_NAME, PYTHON_STANDARD_INDENT +PYTHON_STANDARD_INDENT = ' ' * 4 + +MODULE_NAME = 'genned_block_forward' block_adapter_file_path = os.path.join(TINKER_DIR, f'model/{MODULE_NAME}.py') +def get_framework_path(version:str) ->str: + """临时方案""" + framework_path = os.path.join(project_root(), 'modellink-ref', f'modellink-{version}') + check_path_exit(framework_path) + return framework_path + + def find_source_code(location_list: List[List[str]]) -> Tuple[List[str], List]: """ @@ -48,6 +66,7 @@ def find_source_code(location_list: List[List[str]]) -> Tuple[List[str], List]: class_or_method_name = location[last_dot_index + 1:] try: # 动态导入包 + logger.debug(f'trying to load package {location}') module_obj = importlib.import_module(module_path) class_or_method_obj = getattr(module_obj, class_or_method_name) method_obj = getattr(class_or_method_obj, 'forward') if inspect.isclass( @@ -222,16 +241,16 @@ def gen_block_adapter(use_mcore_models): :param use_mcore_models: 是否使用 mcore :return: """ - package_path = os.getenv('ML_PATH', None) + package_path = get_framework_path(version) if not package_path: - raise RuntimeError("ML_PATH is not set") + raise RuntimeError(f'The version is not supported: {version}') if not os.path.exists(package_path): raise RuntimeError(f'The package path is not exist: {package_path}') # 这里特殊处理一下1.0 版本的patch,以防止后续 MethodLocation 导入报错 logger.info('The package_path is: %s', package_path) sys.path.append(package_path) error_free_import() - block_adapters = mcore_block_adapters if use_mcore_models else legacy_block_adapters + block_adapters = get_block_adapters(version, use_mcore_models) source_method_paths = [adapter.source_method_path for adapter in block_adapters] method_forward_source_code_list, module_obj_list = find_source_code(source_method_paths) # 1 import 部分 @@ -262,6 +281,12 @@ def gen_block_adapter(use_mcore_models): raise write_lines(file_content.splitlines(), block_adapter_file_path) +def get_block_adapters(version, use_mcore_models) + if use_mcore_models: + if version == Version.ML_2_0_0: + return mcore_block_adapters_1_1_rc1 + return mcore_block_adapters + return legacy_block_adapters def find_used_top_func(import_code_str, method_forward_str, module_obj): """ @@ -279,9 +304,10 @@ def find_used_top_func(import_code_str, method_forward_str, module_obj): import_tracker = ImportTracker() import_tracker.visit(tree) # 3. 获取不在import中,且被forward使用的顶层方法 - usage_finder = FuncUsageFinder(module_methods, import_tracker.imports) - usage_finder.visit(tree) - used_funcs_code = usage_finder.used_funcs_code + used_funcs_code = [] + for name in module_methods: + if name not in import_tracker.imports: + used_funcs_code.append(module_methods.get(name)) return used_funcs_code @@ -401,7 +427,7 @@ def get_effective_part(block_adapter: BlockAdapter, source_code: str): else: end_line_idx = len(source_code.splitlines()) - 1 if block_adapter.method_location.cut_mode: - target_code = cut_lines(source_code, start_line_idx, end_line_idx + 1) + target_code = cut_lines(source_code, start_line_idx + 1, end_line_idx + 1) else: target_code = get_lines(source_code, start_line_idx, end_line_idx + 1) return target_code diff --git a/profiler/msprof_analyze/tinker/model/block_adapters.py b/profiler/msprof_analyze/tinker/model/block_adapters.py index a8eda6ba9..4d3555126 100644 --- a/profiler/msprof_analyze/tinker/model/block_adapters.py +++ b/profiler/msprof_analyze/tinker/model/block_adapters.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass +from dataclasses import dataclass, replace from typing import List, Optional @@ -27,7 +27,7 @@ class MethodLocation: start_key_word: Optional[str] # 定位到整体方法后,实际提取部分的结束关键字 end_key_word: Optional[str] - # 是否开启cut_mode模式 + # 是否开启cut_mode模式,对中间变了的临时适配方法 cut_mode: bool = False @@ -87,7 +87,7 @@ legacy_block_adapters = [ BlockAdapter( block_name='transformer_block', method_location=MethodLocation( - source_method_path=['modellink.model.transformer.parallel_transformer_forward', + source_method_path=['modellink_llm.legacy.model.transformer.parallel_transformer_forward', 'megatron.model.transformer.ParallelTransformer', 'megatron.legacy.model.transformer.ParallelTransformer'], start_key_word=None, @@ -104,7 +104,7 @@ legacy_block_adapters = [ BlockAdapter( block_name='final_norm', method_location=MethodLocation( - source_method_path=['modellink.model.transformer.parallel_transformer_forward', + source_method_path=['modellink_llm.legacy.model.transformer.parallel_transformer_forward', 'megatron.model.transformer.ParallelTransformer', 'megatron.legacy.model.transformer.ParallelTransformer'], start_key_word='self.final_norm', @@ -119,7 +119,7 @@ legacy_block_adapters = [ BlockAdapter( block_name='post_process', method_location=MethodLocation( - source_method_path=['modellink.model.gpt_model.GPTModel', + source_method_path=['modellink_llm.legacy.model.gpt_model.GPTModel', 'mindspeed_llm.legacy.model.gpt_model.GPTModel'], start_key_word='post_language_model_processing', end_key_word='fp16_lm_cross_entropy' @@ -180,7 +180,7 @@ mcore_block_adapters = [ block_name='post_process', method_location=MethodLocation( source_method_path=['modellink.core.models.gpt.gpt_model.gpt_model_forward'], - start_key_word='decoder_input is not None', + start_key_word='args = get_args()', end_key_word='return hidden_states', cut_mode=True ), @@ -189,6 +189,12 @@ mcore_block_adapters = [ module_path="", weight_param_module=['output_layer'], input_source=[ParamSource('hidden_states'), + ParamSource('input_ids', from_forward=False), + ParamSource('position_ids', from_forward=False), + ParamSource('attention_mask', from_forward=False) ParamSource('labels', from_forward=False)] ) -] \ No newline at end of file +] + +mcore_block_adapters_1_1_rc1 = [adapter for adaptor in mcore_block_adapters] +mcore_block_adapters_1_1_rc1[3] = replace(mcore_block_adapters[3],method_location=replace(mcore_block_adapters[3].method_location,start_key_word='attention_mask)')) \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/model/block_infos.py b/profiler/msprof_analyze/tinker/model/block_infos.py index 45d31d8e4..2be98d225 100644 --- a/profiler/msprof_analyze/tinker/model/block_infos.py +++ b/profiler/msprof_analyze/tinker/model/block_infos.py @@ -71,6 +71,7 @@ class BlockInfo: self.adapter: BlockAdapter = block_adapter # block名称,仅起到标识作用,从BlockAdapter中获取 self.name: str = block_adapter.block_name + self.dump_name = self.name # block对应module,延时生成 self.module: torch.nn.Module = self._get_module(model) diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index 7b84a79fd..26be9f7a9 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -31,7 +31,7 @@ from torch_npu.npu import amp # 选择引用的ModelLink版本 from tinker import megatron_patch from tinker.framework_adapter.modellink_adapter import get_adapter, ModelLinkAdapter -from tinker.megatron_patch.arguments import get_num_layers +from tinker.megatron_patch.arguments import get_num_layers,get_dense from tinker.megatron_patch.microbatches import rebuild_num_microbatches_calculator from tinker.model.block_infos import get_model_block_infos, BlockInfo from tinker.profiler.profile_classes import ProfileArgs, InitTensorInfo @@ -76,13 +76,18 @@ class TinkerProfiler: self.bwd_timer = NPUTimer() self._set_vars() + # shapes for input tensors and extra tensors + self.input_shape_dict = {} # type: Dict[str, Dict[str, List[int]]] + self.input_extra_dict = {} # type: Dict[str, Dict[str, List[int]]] # size of inputs and outputs self.input_size_dict = {} self.output_size_dict = {} self.activation_size_dict = {} + #使用dump_name访问 self.weight_size_dict = {} self.last_mbs = None self.block_tensor_map = None + self.origin_module = None @staticmethod def calculate_data_size(tensors): @@ -135,13 +140,29 @@ class TinkerProfiler: """ return 'attention_mask' not in tensor_name and 'rotary_pos_emb' not in tensor_name and tensor_value is not None + @staticmethod + def _generate_block_strucure(): + num_layers = get_num_layers() + num_dense = get_dense() + structure = [['embedding', 1]] + if num_dense: + structure.append(['transformer_dense', num_dense]) + structure.append(['transformer_moe', num_layers - num_dense]) + else: + structure.append(['transformer_block', num_layers - num_dense]) + structure.append(['final_norm', 1]) + structure.append(['post_process', 1]) + return structure + def dump_model_info(self): """ 在profile_data中保存seq_length等信息,供后续search等流程使用 """ - dump_args = ['seq_length', 'fp16', 'bf16'] - dump_dict = {arg: getattr(self.args, arg) for arg in dump_args} + #search模块需要使用的args信息 + dump_args = ['seq_length', 'fp16', 'bf16', 'use_mcore_models', 'hidden_size'] + dump_dict = {arg: getattr(self.args, arg, None) for arg in dump_args} dump_dict['num_layers'] = get_num_layers() + dump_dict['block_structure'] = self._generate_block_strucure() dump_file_name = f'model_info_seq{self.args.seq_length}.json' dump_file_path = os.path.join(self.args.prof_path, dump_file_name) os.makedirs(os.path.dirname(dump_file_path), exist_ok=True) @@ -155,13 +176,16 @@ class TinkerProfiler: :return: """ for block_info in block_infos: - cur_input_tensors, cur_output_tensors = self.block_tensor_map[block_info.name] - block = block_info.get_block() + run_times = 0 + while self._should_run(block_info, run_times): + run_times += 1 + cur_input_tensors, cur_output_tensors = self.block_tensor_map[block_info.name] + block = block_info.get_block() - self.weight_size_dict[block_info.name] = np.prod(block.weight_size) * self.data_base + self.weight_size_dict[block_info.dump_name] = np.prod(block.weight_size) * self.data_base - self.input_size_dict[block_info.name] = self.calculate_data_size(cur_input_tensors) - self.output_size_dict[block_info.name] = self.calculate_data_size(cur_output_tensors) + self.input_size_dict[block_info.name] = self.calculate_data_size(cur_input_tensors) + self.output_size_dict[block_info.name] = self.calculate_data_size(cur_output_tensors) def get_inputs(self, block_name): """ @@ -216,10 +240,7 @@ class TinkerProfiler: # Profiling forward/backward computation time - # 让反向中保存计算图以支撑多次调用 - self.adapter.pre_time_profile_backward_step() - - if "post" in block_info.name: + if "post" in block_info.name or 'moe' in block_info.dump_name: # 需匹配梯度版本,1次前向 1次反向 交替进行 for index in range(self.args.prof_repeat_times[0] + self.args.prof_warmup_times): # 内存溢出得厉害,提个函数尝试规避下 @@ -264,7 +285,8 @@ class TinkerProfiler: self.adapter.pre_time_profile_backward_step() # 同步所有rank torch.distributed.barrier() - self.adapter.backward_step(backward_input_tensors, origin_outputs, output_grads) + #self.adapter.backward_step(backward_input_tensors, origin_outputs, output_grads) + self.adapter.custom_backward(origin_outputs,output_grads[0]) torch.distributed.barrier() # 额外跑一次对齐时间 self.adapter.custom_backward(origin_outputs, output_grads[0]) @@ -321,6 +343,10 @@ class TinkerProfiler: if backward_input_tensors is None: torch.autograd.backward(outputs, grad_tensors=output_grads, retain_graph=True) else: + backward_input_tensors = [] + for input_name in input_data: + if input_data[input_name] is not None and input_data[input_name].requires_grad: + backward_input_tensors.append(input_data[input_name]) self.adapter.backward_step(backward_input_tensors, outputs, output_grads) mem_reserved_bwd = byte_to_mb(torch.cuda.memory_reserved()) @@ -361,8 +387,7 @@ class TinkerProfiler: return args = self.args profile_logger.info(f"====== PROFILING RESULTS ({self.profile_args.model}{self.profile_args.hint}) ======") - result_title = ["config", "block_name", "forward-compute", "backward_compute", "input_size", "output_size", - "weights", + result_title = ["block_name", "forward-compute", "backward-compute", "input_size", "output_size", "weights", "activations", "fwd_reserved", "bwd_reserved"] csv_path = os.path.join(args.prof_path, "profiled_data.csv") csv_exists = os.path.isfile(csv_path) @@ -398,19 +423,22 @@ class TinkerProfiler: def update_tensor_map(self, block_infos: List[BlockInfo], mbs): forward_output = None + InitTensorInfo = namedtuple('InitTensorInfo', ['shape', 'requires_grad', 'device', 'dtype', 'element_size']) # 初始化 if self.block_tensor_map is None: first_input = self._get_first_input() block_tensor_map = {} for block_info in block_infos: + profile_logger.debug(f'[Tinker.Debug] 准备update_tensor_map - {block_info.name}') genned_block = block_info.get_block() wrapped_block = self.adapter.wrap_block(genned_block) # 拿到当前 block 的 input_tensors_info 和 input_extra_tensors_info input_tensors = block_info.get_input_tensors(first_input, forward_output) + torch.distributed.barrier() forward_output = wrapped_block(input_tensors) - - extract_input_tensors_info = self.extract_input_tensors(input_tensors) - extract_output_tensors_info = self.extract_input_tensors(forward_output) + profile_logger.debug(f'[Tinker.Debug] tensor信息获取成功 - {block_info.name}') + extract_input_tensors_info = self.extract_input_tensors(input_tensors, InitTensorInfo) + extract_output_tensors_info = self.extract_input_tensors(forward_output, InitTensorInfo) block_tensor_map[block_info.name] = extract_input_tensors_info, extract_output_tensors_info self.block_tensor_map = block_tensor_map self.last_mbs = mbs @@ -463,37 +491,56 @@ class TinkerProfiler: for block_info in block_infos: if oom: # 因profile_space的日志监控会在block_profiler发生oom时kill进程,因此该处逻辑不会进行;但保留该处以防kill失败 - profile_logger.info(f'[results] already oom, skip {block_info.name}') - self.profiled_results[block_info.name] = [EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, + profile_logger.info(f'[results] already oom, skip {block_info.dump_name}') + self.profiled_results[block_info.dump_name] = [EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, self.input_size_dict[block_info.name], self.output_size_dict[block_info.name], - self.weight_size_dict[block_info.name], + self.weight_size_dict[block_info.dump_name], EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE] continue - profile_logger.info(f"working on {block_info.name}{self.profile_args.hint} ... ") - try: - fwd_time, bwd_time, reserved_fwd, reserved_bwd, allocated_fwd = self.profile_block(block_info) - except RuntimeError as e: - if "NPU out of memory" in str(e): - # OOM 没必要测试更大的mbs - oom = True - # break - profile_logger.error(f'RANK{torch.distributed.get_rank()}: {"-*/" * 20}') - profile_logger.error(e) - traceback.print_exc() - fwd_time = bwd_time = EXCEPTIONAL_VALUE - reserved_fwd = reserved_bwd = allocated_fwd = EXCEPTIONAL_VALUE + run_times = 0 + while self._should_run(block_info,run_times): + run_times += 1 + oom = self._get_result(block_info, oom) + self.dump_profiled_results() + return not oom + + def _get_result(self, block_info, oom): + + profile_logger.info(f"working on {block_info.dump_name}{self.profile_args.hint} ... ") + try: + fwd_time, bwd_time, reserved_fwd, reserved_bwd, allocated_fwd = self.profile_block(block_info) + except RuntimeError as e: + if "NPU out of memory" in str(e): + # OOM 没必要测试更大的mbs + oom = True + # break + + torch.distributed.barrier() + time.sleep(torch.distributed.get_rank() / 3) + profile_logger.error(f'RANK{torch.distributed.get_rank()}: {"-*/" * 20}') + profile_logger.error(e) + traceback.print_exc() + fwd_time = bwd_time = EXCEPTIONAL_VALUE + reserved_fwd = reserved_bwd = allocated_fwd = EXCEPTIONAL_VALUE profile_logger.info(f"[results] {block_info.name}: fwd_compute = {fwd_time:.2f} us, " f"bwd_compute = {bwd_time:.2f} us, fwd_allocated = {allocated_fwd:.1f} MB, " f"fwd_reserved = {reserved_fwd:.1f} MB, bwd_reserved = {reserved_bwd:.1f} MB.") - self.profiled_results[block_info.name] = [fwd_time, bwd_time, + self.profiled_results[block_info.name] = [fwd_time, bwd_time, self.input_size_dict[block_info.name], self.output_size_dict[block_info.name], - self.weight_size_dict[block_info.name], + self.weight_size_dict[block_info.dump_name], allocated_fwd, reserved_fwd, reserved_bwd] - self.dump_profiled_results(block_infos) - return not oom + return oom + + def _should_run(self, block_info:BlockInfo, run_times: int): + should_modify = get_dense() and 'transformer' in block_info.name and run_times <2 + if should_modify: + #首次进入,先将layers搞成dense + self._modify_transformer_layers(block_info, already_dense=run_times) + return not run_times or should_modify + def _set_vars(self): self.args.iteration = 1 @@ -501,9 +548,9 @@ class TinkerProfiler: tp=self.args.tensor_model_parallel_size, sp=int(self.args.sequence_parallel), ep=self.args.expert_model_parallel_size, + cp=self.args.context_parallel_size, + extend=int(getattr(self.args, 'moe_tp_extend_ep', False)), mbs=1, - algo=0, - model=f'{self.args.prof_model_name}_{self.args.prof_model_size}' # from cur_model_name ) self.data_base = byte_to_mb(self.args.params_dtype.itemsize) # from DATA_BASE # fp32存权重梯度 @@ -513,6 +560,16 @@ class TinkerProfiler: """此处使用modellink代码,需密切关注patch、变更情况""" return HeadInputTensor(self.adapter.get_head_input()) + def _modify_transformer_layers(self,block_info:BlockInfo, already_dense): + if not already_dense: + block_info.dump_name = 'transformer_dense' + if self.origin_module is None: + self.origin_module = block_info.module.layers + block_info.module.layers = torch.nn.ModuleList(self.origin_module[0]) + elif self.origin_module is not None: + block_info.dump_name = 'transformer_moe' + block_info.module.layers = torch.nn.ModuleList(self.origin_module[1]) + def get_backward_input_tensors(block_info, input_data): """""" @@ -532,6 +589,7 @@ def main(): init_profile_log(logging.INFO) tinker_profiler = TinkerProfiler(adapter) args = adapter.get_args() + args.model_text = f'{args.prof_model_name}-{args.prof_model_size}' # get profiling tasks # "task"s are defined by unique {model, size, mbs} pairs all_tasks_mbs = [] diff --git a/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py b/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py index 67eeda985..d244cf08f 100644 --- a/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py +++ b/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py @@ -37,10 +37,10 @@ DEL_PARAM_IN_GPT_ARGS = [ '--overlap-param-gather ', '--num-layer-list ', '--load ', '--save ', '--recompute-granularity ', '--recompute-method ', '--recompute-num-layers ', '--context-parallel-size ', '--context-parallel-algo ', '--ulysses-degree-in-cp ', '--cp-attention-mask-type ', '--use-cp-send-recv-overlap ', - '--kv-head-repeat-before-uly-alltoall ' + '--kv-head-repeat-before-uly-alltoall ', '--moe-tp-extend-ep' ] -DEL_PARAM_IN_MOE_ARGS = ['--expert-parallel-size '] +DEL_PARAM_IN_MOE_ARGS = ['--expert-model-parallel-size '] def replace_export(match): diff --git a/profiler/msprof_analyze/tinker/profiler/profile.sh b/profiler/msprof_analyze/tinker/profiler/profile.sh index 128566bcb..e4a7900ee 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile.sh +++ b/profiler/msprof_analyze/tinker/profiler/profile.sh @@ -20,28 +20,39 @@ if [[ $(pwd) == *"ma-user"* ]]; then fi # 1. 命令行入参校验 若异常则提示检查`profile_space.py` -if [ "$#" -lt 4 ]; then - echo "Error: Script profile.sh requires at least 4 arguments, but get $# arguments" - echo " Supposed arguments: model_name model_size TP SP EP=0 mbs_limit=65536 save_path=./profiled_data suffix=DateTimeStamp version=1.1" +if [ "$#" -lt 7 ]; then + echo "Error: Script profile.sh requires at least 7 arguments, but get $# arguments" + echo " Supposed arguments: model_name model_size exp_id num_npu profiled_data_path ml_version profiled_args" echo " Please check TinkerScripter.run_config() in profile_space.py" exit 1 fi -SUFFIX=-${8:-"$(date '+%y%m%d-%H%M%S')"} -if [ $SUFFIX == "-" ]; then - SUFFIX="" -fi -RUNTIME_PATH=${7:-"$(pwd)/profiled_data"} -mbs_limit=${6:-65536} -ep=${5:-0} -export ML_VERSION=${9:-1.1} +MODEL_NAME = ${1} +MODEL_SIZE = ${2} +EXP_ID = ${3} +NPROC = ${4} +PROFILING_PATH = ${5} +#profiler 需要使用的训练框架版本信息 +export ML_VERSION =${6} +TINKER_ARGS = ${7} export IS_TUNE=${10:-0} +#可调节的profiler超参,目前看取3-10和10-40无影响 +WARMUP_TIMES = 3 +REPEAT_TIMES =10 + +echo "=====================================work on ${EXP_ID} =======================================" +#2. 允许在其他路径使用tinker SCRIPT_DIR=$(dirname "$(realpath "$0")") PROJECT_PATH=$(dirname $(dirname "$SCRIPT_DIR")) export PYTHONPATH=$PROJECT_PATH:$PYTHONPATH -# 2. 变量初始化,其中系统变量的export相关逻辑在各model.sh中完成 +#临时适配路径,后续将优化路径的输入和对版本信息的使用 +export ML_PATH="${PROJECT_PATH}/modellink-ref/modellink-${ML_VERSION}" +echo "profiler used ml_path: ${ML_PATH}" + + +#3.分布式训练相关参数生成 find_free_port() { local port=7000 while netstat -an | grep -q "$port"; do @@ -50,11 +61,11 @@ find_free_port() { echo $port } +NNODES=1 +NODE_RANK=0 MASTER_ADDR=localhost MASTER_PORT=$(find_free_port) echo 使用端口 $MASTER_PORT -NNODES=1 -NODE_RANK=0 DISTRIBUTED_ARGS=" --nnodes ${NNODES} \ @@ -63,16 +74,9 @@ DISTRIBUTED_ARGS=" --master_port ${MASTER_PORT} \ " -# 可调节的profiler超参,目前看取3-10和10-40无影响 -WARMUP_TIMES=3 -REPEAT_TIMES=10 - -MAX_NUM_GPUS=8 - -# 3. 模型结构参数脚本,读取模型结构命令行参数 +# 4.模型结构参数狡辩,读取模型结构命令行参数 ascend_model_script="$SCRIPT_DIR/ascendcloud_model_${1}_${2}.sh" - -model_script="$SCRIPT_DIR/model_${1}_${2}.sh" +model_script="$SCRIPT_DIR/model_${MODEL_NAME}_${MODEL_SIZE}.sh" # 检查脚本文件是否存在 if [ -f "$model_script" ]; then @@ -84,35 +88,14 @@ else exit 1 fi -# 为不同环境生成存放词表和数据文件的路径 -if [ -z "$ML_MODEL_PATH" ]; then - CURRENT_PATH=$(pwd) - if [[ $CURRENT_PATH == *"ma-user"* ]]; then - export ML_MODEL_PATH="/home/ma-user/work/modellink-resources" - else - export ML_MODEL_PATH="." - fi -fi - # 待覆盖变量 GPT_ARGS="" -MOE_ARGS="" -# 此脚本应使能 WITHOUT_JIT_COMPILE TOKENIZER_PATH GPT_ARGS MOE_ARGS 的刷新 +# 此脚本应使能GPT_ARGS的刷新 echo "source $effect_script" source $effect_script -# 此处代码和effect_script有顺序关系 -MODEL_NAME=$1 -MODEL_SIZE=$2 - -# 4. 数据落盘地址 -PROFILING_PATH="${RUNTIME_PATH}/profiled-data-${MODEL_NAME}-${MODEL_SIZE}${SUFFIX}" # 若目录不存在,则会自动创建 -if [ "$#" -lt 8 ]; then - rm -rf $PROFILING_PATH # 未指定时,才删除重复拉起的目录,但目前没用 -fi -mkdir -p ${PROFILING_PATH} - +#4.整合入参 拉起训练 PROF_ARGS=" --prof-path ${PROFILING_PATH} \ --prof-model-name ${MODEL_NAME} \ @@ -121,44 +104,16 @@ PROF_ARGS=" --prof-repeat-times ${REPEAT_TIMES} \ " -torch_run() { - local tp=$1 - local sp=$2 - local ep=$3 - local mbs_limit=$4 - local dp=1 - if ((ep >= 1)); then - let dp=ep - fi - if ((tp * dp > MAX_NUM_GPUS || tp == 1 && sp == 1)); then - return 1 - fi - EXP_ID="tp${tp}_sp${sp}_ep${ep}" - echo "================================ working on ${EXP_ID} ================================" - let gpu=tp*dp - SUMMARIZE_ARGS=" - ${PROF_ARGS} - ${GPT_ARGS} - --tensor-model-parallel-size ${tp} - --pipeline-model-parallel-size 1 - --distributed-timeout-minutes 5 - " - if [ "${ep}" -ge 1 ]; then - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} ${MOE_ARGS} --expert-model-parallel-size ${ep}" - fi - if [ "${sp}" -eq 1 ]; then - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --sequence-parallel" - fi - # 可规避一部分mbs oom情况 - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --prof-mbs-limit ${mbs_limit}" - echo [TIME] before profiling ${EXP_ID} : $(date '+%Y-%m-%d-%H-%M-%S') >> ${PROFILING_PATH}/profiling_${MODEL_NAME}.log - - torchrun ${DISTRIBUTED_ARGS} --nproc_per_node ${gpu} $SCRIPT_DIR/block_profiler.py \ +SUMMARIZE_ARGS=" + ${PROF_ARGS} + ${GPT_ARGS} + ${TINKER_ARGS} +" +echo " + torchrun ${DISTRIBUTED_ARGS} --nproc_per_node ${NPROC} $SCRIPT_DIR/block_profiler.py \ ${SUMMARIZE_ARGS} \ 2>&1 | tee ${PROFILING_PATH}/profiling_${MODEL_NAME}_${EXP_ID}.log - - echo [TIME] after profiling ${EXP_ID} : $(date '+%Y-%m-%d-%H-%M-%S') >> ${PROFILING_PATH}/profiling_${MODEL_NAME}.log -} - -# 6. 拉起该次profiler任务: tp sp ep mbs_limit -torch_run $3 $4 $ep $mbs_limit \ No newline at end of file +" +torchrun ${DISTRIBUTED_ARGS} --nproc_per_node ${NPROC} $SCRIPT_DIR/block_profiler.py \ + ${SUMMARIZE_ARGS} \ + 2>&1 | tee ${PROFILING_PATH}/profiling_${MODEL_NAME}_${EXP_ID}.log \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index 4cc7a435a..71e7ec130 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -32,6 +32,7 @@ from tinker.profiler import gen_model_structure from tinker.profiler.profile_classes import ScriptArgs from tinker.model.adapter_utils import gen_block_adapter from tinker.utils.config import TINKER_DIR +from tinker.utils.constant import Version from tinker.utils.logger import init_log, logger from tinker.utils.utils import extract_and_format_model_size from tinker.version import dump_task_info @@ -70,6 +71,88 @@ class MemoryImpact: return cls._opposite(impact) +@dataclass(frozen=True) +class ScriptArgs: + tp: int = 1 + sp: int = 0 + #此EP指放在CP和DP上的EP + ep: int = 1 + cp: int = 1 + # --moe-tp-extend-ep:做的TP在MoE-mlp部分做EP,此处为01取值,1为开启,开启后实际EP = TP + EP + extend: int =0 + + + @property + def exp_id(self): + return '_'.join([f'{key}{value}' for key, value in self.__dict__.items()]) + + @property + def npu_used(self): + """ + 计算改组block内并行策略占用的卡数,用于约束检测和下发任务 + """ + p_times = 1 + for p in [self.tp, self.cp]: + if p > 1: + p_times *=p + #若EP在CP中放不下,则要新增DP + if self.cp % self.ep: + if self.ep % self.cp: + p_times *= self.ep + else: + p_times *= self.ep // self.cp + return p_times + + def items(self): + return self.__dict__.items() + + def is_legal(self, max_npu, args): + illegal = [ + max_npu % self.npu_used, + self.tp == 1 and self.sp, + args.hidden_size % self.tp, + args.ffn_hidden_size % self.tp, + args.num_attention_heads % self.tp, + #默认使用ulysses-CP 头数需能被TP*CP整除 + args.num_attention_heads % (self.tp * self.cp), + args.group_query_attention and args.num_query_groups % self.tp, + args.seq_length % self.tp and self.sp, + args.seq_length % self.cp, + self.ep > 1 and not hasattr(args, 'num_experts') + ] + if not hasattr(args, 'num_experts'): + return not any(illegal) + # MoE + illegal.extend([ + self.tp > 1 and not self.sp, + self.tp == 1 and self.extend, + self.extend and args.num_experts % (self.tp * self.ep), + args.num_experts % self.ep, + ]) + return not any(illegal) + + def build_megtron_command_line_arguments(self, version, max_mbs, kv_repeat): + result = [f'--tensor-model-parallel-size {self.tp}', f'--pipeline-model-parallel-size 1' + f'--distributed-timeout-minutes 5', f'--prof-mbs-limit {max_mbs}'] + if self.ep > 1 : + result.append(f'--expert-model-parallel-size {self.ep}') + #TODO 后续版本军科启用 --moe-alltoall-overlap-comm, 无法与gemm-gradient-accumulation-fusion 同时使用 + if version == Version.ML_2_0_0 and self.tp == 1: + result.append(f'--moe-alltoall-overlap-comm') + if self.sp: + result.append(f'--sequence-parallel') + if self.cp > 1: + result.append(f'--context-parallel-size {self.cp}') + result.append(f'--context-parallel-algo ulysses_cp_algo') + result.append(f'--use-cp-send-recv-overlap') + if kv_repeat: + result.append(f'--kv-head-repeat-before-uly-alltoall') + if self.extend: + result.append(f'--moe-tp-extend-ep') + result.append('--moe-alltoall-overlap-comn') + return '\n'.join(result) + + @dataclass class Feature: name: str @@ -164,24 +247,32 @@ def pre_log(pre_logging_text): class TinkerScripter: MAX_MBS = 2 ** 30 MAX_NPU = 8 - TP = None - SP = None - EP = None - def __init__(self, model, model_size, suffix, save_path, version, model_args, is_full_tune): + def __init__(self, model, model_size, suffix, args, model_args,is_full_tune): self.model = model self.model_size = model_size self.profiler_script = f"{TINKER_DIR}/profiler/profile.sh" self.suffix = suffix - self.save_path = save_path - self.version = version + self.save_path = args.save_path + self.version = args.version + self.profiled_data_path = args.profiled_data_path + self.args = args self.model_args = model_args self.is_full_tune = '1' if is_full_tune else '0' + @property + def can_cp(self): + return (self.version in [Version.ML_1_0_0, Version.ML_2_0_0, Version.ML_1_0_rc3] + and self.model_args.use_mcore_models) + @property def can_ep(self): return hasattr(self.model_args, 'num_experts') + @property + def can_extend(self): + return self.version in [Version.ML_2_0_0] + @staticmethod def post_process(mbs_limit, oom, process, torchrun_failed): try: @@ -193,36 +284,52 @@ class TinkerScripter: logger.info(f"stderr: {stderr}") if process.returncode: if oom: - logger.info(f"profile内存溢出于{mbs_limit},将裁剪剩余并行策略探索空间") + logger.info(f"profile因oom止步于{mbs_limit},将裁剪剩余并行策略探索空间") elif torchrun_failed: logger.warning(f"torchrun执行错误") + raise RuntimeError('Tinker 功能出现问题,Torchrun 运行失败') else: logger.warning(f"脚本执行错误") @staticmethod def is_valid_value(value, space): + if isinstance(space, str): + return value in set(map(int, space.split(','))) return space is None or value in space def get_arg_space(self): - tp_space = [i for i in range(1, self.MAX_NPU + 1) if self.is_valid_value(i, self.TP)] - sp_space = [i for i in range(2) if self.is_valid_value(i, self.SP)] - ep_space = [i for i in range(1, self.MAX_NPU + 1)] if self.can_ep else [1] + tp_space = [i for i in range(1, self.MAX_NPU + 1) if self.is_valid_value(i, self.args.prof_tp)] + sp_space = [i for i in range(2) if self.is_valid_value(i, self.args.prof_sp)] + ep_space = [i for i in range(1, self.MAX_NPU + 1) + if self.is_valid_value(i, self.args.prof_ep)] if self.can_ep else [1] + cp_space = [i for i in range(1, self.MAX_NPU + 1) + if self.is_valid_value(i, self.args.prof_cp)] if self.can_cp else [1] + extend_space = [i for i in range(2) + if self.is_valid_value(i, self.args.prof_extend)] if self.can_extend else [0] return ArgSpace([ Feature(name="tp", range=tp_space, memory_impact=MemoryImpact.REDUCE), Feature(name="sp", range=sp_space, memory_impact=MemoryImpact.REDUCE), Feature(name="ep", range=ep_space, memory_impact=MemoryImpact.REDUCE), + #CP在开启kv_repeat时 + Feature(name="cp", range=cp_space, memory_impact=MemoryImpact.REDUCE), + Feature(name="extend", range=extend_space, memory_impact=MemoryImpact.NEUTRAL), ], self.MAX_NPU, self.MAX_MBS, self.model_args) + def get_kv_repeat(self, config:ScriptArgs) -> bool: + return self.model_args.group_query_attention and config.cp > self.model_args.num_query_groups + def run_config(self, config: ScriptArgs, max_mbs: int) -> int: # 格式化model_size model_size = extract_and_format_model_size(self.model_size) - # 测量block内并行策略性能 tp sp ep - command = ['bash', self.profiler_script, self.model, model_size, *config.cmd_text_list, str(max_mbs), - self.save_path, self.suffix, self.version, self.is_full_tune] + kv_repeat = self.get_kv_repeat(config) + cmd_args_str = config.build_megtron_command_line_arguments(self.version, max_mbs, kv_repeat) + # 测量block内并行策略性能 tp sp ep cp + command = ['bash', self.profiler_script, self.model, model_size, config.exp_id, str(config.npu_used), + self.profiler_data_path, self.version.value, cmd_args_str, self.is_full_tune] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid) mbs_limit = self.MAX_MBS - mbs_now = 1 + mbs_done = 0 torchrun_failed = False oom = False already_killed = False @@ -243,18 +350,18 @@ class TinkerScripter: logger.info(output) # 随地记录mbs - if "mbs = " in output: - mbs_now = int(re.search(r'mbs = (\d+)', output)[1]) + if " [Tinker Info] mbs " in output: + mbs_done = int(re.search(r'mbs (\d+)', output)[1]) # profile下爆显存,直接终止 if "NPU out of memory" in output and not already_killed: - mbs_limit = mbs_now + mbs_limit = mbs_done os.killpg(os.getpgid(process.pid), signal.SIGTERM) already_killed = True - logger.info(f'OOM when mbs is {mbs_now}') + logger.info(f'OOM when mbs is {mbs_done}') oom = True - if "[Tinker-Profiler] OOM when mbs" in output: + if "[Tinker-Profiler] OOM when mbs" in output or "Available memory is insufficient" in output: mbs_limit = int(re.search(r'\[Tinker-Profiler] OOM when mbs=(\d+)', output)[1]) if '.py FAILED' in output and not already_killed: @@ -265,7 +372,7 @@ class TinkerScripter: # 获取剩余的标准错误输出 self.post_process(mbs_limit, oom, process, torchrun_failed) - return mbs_limit + return mbs_limit + 1 def get_model_structure(args): @@ -280,6 +387,7 @@ def get_model_structure(args): setattr(model_args, key, True) model_args.group_query_attention = getattr(model_args, 'group_query_attention', False) model_args.num_query_groups = getattr(model_args, 'num_query_groups', 1) + model_args.num_query_groups = getattr(model_args, 'use_mcore_models', False) if args.max_npu is None: TinkerScripter.MAX_NPU = model_args.nproc_per_node # 删除用于传递单节点卡数的属性,该属性并不是模型参数 @@ -313,10 +421,6 @@ def run(args: argparse.Namespace): TinkerScripter.MAX_NPU = args.max_npu # (待改进)将TinkerScripter.MAX_MBS逻辑更换为可取的最大mbs取值(当前取不到) TinkerScripter.MAX_MBS = args.max_mbs + 1 - if args.prof_tp is not None: - TinkerScripter.TP = set(map(int, args.prof_tp.split(','))) - if args.prof_sp is not None: - TinkerScripter.SP = set(map(int, args.prof_sp.split(','))) pre_logging_text = [] tz = datetime.timezone(datetime.timedelta(hours=8)) @@ -337,8 +441,7 @@ def run(args: argparse.Namespace): model_args = get_model_structure(args) # 自动生成adapter gen_block_adapter(hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) - profiler = TinkerScripter(model_name, model_size, suffix, args.save_path, args.version, model_args, - args.is_full_tune) + profiler = TinkerScripter(model_name, model_size, suffix, args, model_args, args.is_full_tune) # 生成参数空间 arg_space = profiler.get_arg_space() # 迭代获取config @@ -355,4 +458,4 @@ def run(args: argparse.Namespace): # 输出profile情况 logger.info(f'Profile Space Total Time: {time.time() - start_time:.2f}') logger.info(f'Profile Data Saved at {dir_path}') - return dir_path + return dir_path \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index 09e9e6567..a19a8f1dc 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -32,7 +32,7 @@ from tinker.search.arguments import print_args, preprocess_args from tinker.search.data import TaskParam, SearchArgs, ResultArgs, Metrics from tinker.search.process import ResultOutputHandler from tinker.utils.block_args import BlockArgs, BlockCost, DetailedInfo -from tinker.utils.utils import load_infos, convert_to_pp_stage_block_idx +from tinker.utils.utils import load_infos, convert_to_pp_stage_block_idx,byte_to_mb from tinker.utils.logger import logger, init_log FeaturesType = ProfileArgs @@ -55,6 +55,10 @@ class ProfiledData: # 现在主要就用这玩意儿 self._block_data = defaultdict(dict) # type: Dict[FeaturesType, Dict[str, BlockCost]] + @property + def data_ready(self): + return bool(self._block_data) + @staticmethod def _get_data(datas: ProfileDataType, features: FeaturesType, block_name="") -> Union[Dict, FixedValueDict, float]: if features not in datas: @@ -104,11 +108,10 @@ class TinkerCostModel: self.inter_band = None # type: Optional[List[float]] self.intra_band = None # type: Optional[List[float]] self._read_band_time(args.profiled_data_path) - self.block_names = BlockNames(*self.profiled_data.get_block_names()) # 剔除transformer block块后的其余块个数 - self.num_other_block = len(fields(self.block_names)) - 1 self.num_procs_per_node = args.num_npus_per_node self.args = args + self.block_names = self._get_block_names() #BlockName(*self.profiled_data.get_block_name()) @property def _data_ready(self): @@ -176,7 +179,7 @@ class TinkerCostModel: @staticmethod def get_pp_range(num_npus, num_layers, p_args: ProfileArgs): for pp in range(1, min(num_layers, num_npus) + 1): - if num_npus % (p_args.npu_used * pp) == 0 and num_npus // p_args.tp // pp >= p_args.ep: + if num_npus % (p_args.npu_used * pp) == 0 and num_npus // p_args.tp // p_args.cp // pp >= p_args.ep: yield pp @staticmethod @@ -230,19 +233,26 @@ class TinkerCostModel: block_data = data[block_name] return BlockArgs(self.args, profiled_args, block_data) - def init_blocks(self, profile_args: ProfileArgs, num_layers: int) -> List[BlockArgs]: + def set_seq_length_relate_mem(self, profile_args:ProfileArgs, block_list: List[BlockArgs]): + if self.args.use_mcore_models: + block_list1[1].attention_mask_mem = 4 + else: + attention_mask_mem = byte_to_mb(self.args.seq_length * self.args.seq_length) + for block in block_list[1:-2]: + block.attention_mask_mem = attention_mask_mem + if profile_args.sp: + num_elements = self.args.seq_length *self.args.hiddensize * 2 /profile_args.cp + block_list[1].attention_mask_mem = byte_to_mb(num_elements + + def init_blocks(self, profile_args: ProfileArgs) -> List[BlockArgs]: """当前就是头处理 + 若干个block + 尾处理,调用时机确定 ProfileArgs 之后""" # 头处理块 - block_list = [self.get_block_args(self.block_names.pre, profile_args)] # type: List[BlockArgs] - # transformer block - block_list.extend([self.get_block_args(self.block_names.block, profile_args) for _ in range(num_layers)]) - # 尾处理块 - block_list.append(self.get_block_args(self.block_names.post1, profile_args)) - block_list.append(self.get_block_args(self.block_names.post2, profile_args)) - # transformer block 注入仅使用一次的 attention_mask 尺寸 - attention_mask_mem = self.args.seq_length * self.args.seq_length / 1024.0 / 1024.0 - for block in block_list[1:-2]: - block.attention_mask_mem = attention_mask_mem + block_list: List[BlockArgs] = [] + for block_name ,block_num in self.args.block_structure: + for i in range(block_num): + block_list.append(self.get_block_args(block_name, profile_args)) + #transformer block 注入仅使用一次的 与序列长度`seq_length`相关的内存占用 + self.set_seq_length_relate_mem(profile_args, block_list) return block_list def get_stage_status(self, current_blocks, num_npu_before, is_first_stage, is_last_stage): @@ -362,6 +372,28 @@ class TinkerCostModel: if self.inter_band is None and self.intra_band is None: raise RuntimeError("Intra bandwidth and intra bandwidth file are required.") self._band_data_ready = True + def _get_block_name(self): + return [block_name for block_name, block_num in self.args.block_structure] + + +def convert_to_pp_stage_block_idx(num_layer_list: List[int], num_all_blocks_len: int): + """ + 格式转换 + :param num_layer_list: 一种可能的划分方式, num_layer_list中的元素为每个stage的长度 + :param num_all_blocks_len: 加上头尾blocks的长度 + :return: + """ + interval_layer_list = list() + start_num = 1 + for stage_length in num_layer_list: + interval_layer_list.append((start_num, start_num + stage_length - 1)) + start_num += stage_length + # 处理首尾 + first_tuple = interval_layer_list[0] + interval_layer_list[0] = (0, first_tuple[1]) + last_tuple = interval_layer_list[-1] + interval_layer_list[-1] = (last_tuple[0], num_all_blocks_len - 1) + return interval_layer_list def run(args: argparse.Namespace): @@ -376,10 +408,11 @@ def run(args: argparse.Namespace): # 1. 实例化CostModel cost_model = TinkerCostModel(args) # 2. 从观测数据中用户指定的仿真所需数据 args -> pred_profiled_args - pred_profiled_args = ProfileArgs(tp=args.simu_tp, sp=args.simu_sp, ep=args.simu_ep, mbs=args.micro_batch_size) + pred_profiled_args = ProfileArgs(tp=args.simu_tp, sp=args.simu_sp, ep=args.simu_ep,cp=args.simu_cp, + extend=args.simu_extend, mbs=args.micro_batch_size) # 3. 计算开销 # 3.1 生成子图 - pred_blocks = cost_model.init_blocks(pred_profiled_args, args.num_layers) + pred_blocks = cost_model.init_blocks(pred_profiled_args) # 3.2 校验所给策略有效性 remainder = args.num_npus % (args.simu_pp * pred_profiled_args.tp) if remainder != 0: @@ -388,9 +421,9 @@ def run(args: argparse.Namespace): args.num_npus, args.simu_pp, pred_profiled_args.tp )) # 3.3 计算DP LBS,打包CostModel变量并刷新block - npu_used = pred_profiled_args.tp * args.simu_pp + npu_used = pred_profiled_args.tp * pred_profiled_args.cp * args.simu_pp if args.num_npus % npu_used: - raise ValueError("num_npus cannot be evenly divided by the parallel strategy, check tp pp") + raise ValueError("num_npus cannot be evenly divided by the parallel strategy, check tp cp pp") dp = args.num_npus // npu_used local_batch_size = dp * pred_profiled_args.mbs if args.global_batch_size % local_batch_size: diff --git a/profiler/msprof_analyze/tinker/utils/block_args.py b/profiler/msprof_analyze/tinker/utils/block_args.py index 05f4142a4..82124fea2 100644 --- a/profiler/msprof_analyze/tinker/utils/block_args.py +++ b/profiler/msprof_analyze/tinker/utils/block_args.py @@ -139,6 +139,7 @@ class BlockArgs: if 'chatglm' in args.model_name: args.bf16, args.fp16 = False, True self.bf16 = args.bf16 + self.mcore = getattr(args, 'use_mcore_models', False) @property def max_reserved_mem(self): @@ -179,7 +180,7 @@ class BlockArgs: input_mem = self.is_first * self.data.in_size activation_mem = self.data.in_size if self.recompute else self.data.act - dp_dist_opt = self.dp if self.dist_opt else 1 + dp_dist_opt = self.dp * self.profile_args.cp if self.dist_opt else 1 # memory 不同部分的计算 # 初始: (2 + bf16 + 2 / dp_dist_opt) * W @@ -222,4 +223,4 @@ class BlockCost: bwd_reserved: float # TODO 这2个变量 必给扔到别的地方 param_master: int = 2 - param_optimizer: int = 4 + param_optimizer: int = 4 \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/config.py b/profiler/msprof_analyze/tinker/utils/config.py index 0c7bb0c4c..cc74959db 100644 --- a/profiler/msprof_analyze/tinker/utils/config.py +++ b/profiler/msprof_analyze/tinker/utils/config.py @@ -19,6 +19,7 @@ import argparse from functools import partial import json +from tinker.utils.constant import version_parse from tinker.utils.utils import extract_arg_value_from_json, check_path_exist, \ check_path_before_create, check_files_in_dir, check_file_suffix, project_root from tinker.utils.logger import logger @@ -26,7 +27,7 @@ from tinker.utils.constant import InitialValues TINKER_DIR = os.path.join(project_root(), 'tinker') CONFIG_PATH = os.path.join(TINKER_DIR, 'parameter_config.json') -test_free_args = ['prof_tp', 'prof_sp', 'pretrain_script_path_search'] +test_free_args = ['prof_tp', 'prof_sp', 'prof_ep', 'prof_cp', 'prof_extend', 'pretrain_script_path_search'] config_json_parameter = {} mode = "all" @@ -96,8 +97,14 @@ def add_profile_args(parser: argparse.ArgumentParser): help='directory to save profied data, default:`./profiled_data`') profile_group.add_argument('-tp', '--prof_tp', type=str, default=get_profile_arg("prof_tp"), help='specify the TP-value for profiling, default for all TP') + profile_group.add_argument('-cp', '--prof_cp', type=str, default=get_profile_arg("prof_cp"), + help='specify the ulysses CP-value for profiling, default for all TP') profile_group.add_argument('-sp', '--prof_sp', type=str, default=get_profile_arg("prof_sp"), - help='specify the SP-value for profiling, default for all SP') + help='specify the SP-value for profiling, default for both 0 & 1') + profile_group.add_argument('-ep', '--prof_ep', type=str, default=get_profile_arg("prof_sp"), + help='specify the EP-value for profiling, default for all EP') + profile_group.add_argument('-extend', '--prof_extend', type=str, default=get_profile_arg("prof_sp"), + help='specify the moe-tp-extend-ep for profiling, default for both 0 & 1') profile_group.add_argument('--max_mbs', type=int, default=get_profile_arg("max_mbs"), help='specify the max mbs for profiling, default: 65536') profile_group.add_argument('-i', '--task_id', type=str, default=get_profile_arg("task_id"), @@ -145,8 +152,12 @@ def add_simulate_args(parser: argparse.ArgumentParser): help='tensor parallel') simulate_group.add_argument('--simu_pp', type=int, default=get_simulate_arg("simu_pp"), help='pipeline parallel') + simulate_group.add_argument('--simu_cp', type=int, default=get_simulate_arg("simu_cp"), + help='ulysses context parallel') simulate_group.add_argument('--simu_ep', type=int, default=get_simulate_arg("simu_ep"), help='expert parallel') + simulate_group.add_argument('--simu_extend', type=int, default=get_simulate_arg("simu_extend"), + help='--moe-tp-extend-ep') simulate_group.add_argument('--simu_sp', type=int, default=get_simulate_arg("simu_sp"), help='sequence parallel') simulate_group.add_argument('--dist_opt', type=int, default=get_simulate_arg("dist_opt"), @@ -240,6 +251,8 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: args.is_full_tune = True check_args_none(args) + if isinstance(args.version, str): + args.version = version_parse(args.version) process_path(args) check_layers(args) check_path_valid(args.mode) diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py index 3ac44fc24..8d05212bf 100644 --- a/profiler/msprof_analyze/tinker/utils/constant.py +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -19,22 +19,24 @@ from typing import Dict class Version(enum.Enum): - MindSpeed_LLM_1_0_rc1 = "1.0" - MindSpeed_LLM_1_0_rc2 = "1.1" - MindSpeed_LLM_1_0_rc3 = "1.2" + ML_1_0_rc1 = "1.0.rc1" + ML_1_0_rc2 = "1.0.rc2" + ML_1_0_rc3 = "1.0.rc3" + ML_1_0_0 = "1.0.0" + ML_2_0_0 = "2.0.0" def __str__(self): return self.value + @classmethod + def is_valid_value(cls,value: str) -> bool: + return value in cls._value2member_map_ VERSION_ALIASES: Dict[str, Version] = { # 映射标准化版本 - "1.0": Version.MindSpeed_LLM_1_0_rc1, - "1.0.RC1": Version.MindSpeed_LLM_1_0_rc1, - "1.1": Version.MindSpeed_LLM_1_0_rc2, - "1.0.RC2": Version.MindSpeed_LLM_1_0_rc2, - "1.2": Version.MindSpeed_LLM_1_0_rc3, - "1.0.RC3": Version.MindSpeed_LLM_1_0_rc3, + "1.0": Version.ML_1_0_rc1, + "1.1": Version.ML_1_0_rc2, + "1.2": Version.ML_1_0_rc3, } PYTHON_STANDARD_INDENT = ' ' * 4 @@ -43,12 +45,14 @@ MODULE_NAME = 'genned_block_forward' def version_parse(version_str: str) -> Version: - normalized_str = version_str.strip().upper() - if normalized_str.startswith('V'): + normalized_str = version_str.strip().lower() + if normalized_str.startswith('v'): normalized_str = normalized_str[1:] if normalized_str not in VERSION_ALIASES: - raise ValueError(f"Unrecognized version: {version_str}, supported versions: {VERSION_ALIASES.keys()}") - return VERSION_ALIASES[normalized_str] + return VERSION_ALIASES[normalized_str] + if Version.is_valid_value(normalized_str): + return Version(normalized_str) + raise ValueError(f"Unrecognized version: {version_str}, supported versions: {VERSION_ALIASES.keys()}") @dataclass diff --git a/profiler/msprof_analyze/tinker/utils/utils.py b/profiler/msprof_analyze/tinker/utils/utils.py index e06883b7a..e9a016ac5 100644 --- a/profiler/msprof_analyze/tinker/utils/utils.py +++ b/profiler/msprof_analyze/tinker/utils/utils.py @@ -163,12 +163,32 @@ def write_lines(final_res: list, dest_file: str): raise RuntimeError(f'write to file: {dest_file} failed.') from e +def print_result(user_args, config_result_pair): + strategy, metric = config_result_pair + time_costs = [round(time_cost / 1000, 3) for time_cost in metric.time_costs] + mem_costs = [round(mem_cost, 2) for mem_cost in metric.mem_costs] + tokens_per_npu_per_sec = (user_args.seq_length * user_args.global_batch_size / user_args.num_npus / + metric.time_cost * 1000000) + info_text = (f'token/npu/sec = {tokens_per_npu_per_sec:.1f}, time_cost = {metric.time_cost / 1000:.3f}, ' + f'mem_cost = {metric.mem_cost:.2f}, ') + + value_dict = dict(tp=strategy.tp, pp=strategy.pp, dp=strategy.dp, cp=strategy.cp, sp=strategy.sp, ep=strategy.ep, + extend=strategy.extend, + zero=strategy.zero, mbs=strategy.mbs,recompute=strategy.rc, + num_layer_list=list(map(int, strategy.num_layers.split(','))), + time_costs=time_costs, mem_costs=mem_costs) + # 先加后删 + if not strategy.is_moe: + del value_dict['ep'] + info_text += ', '.join([f'{k}={v}' for k, v in value_dict.items()]) + logger.info(info_text) + + def load_infos(args): """读取前序流程保存的模型结构等信息,并保存到全局变量args中""" model_info = find_files(args.profiled_data_path, 'model_info*.json') if not model_info: - logger.info('model_info未找到,seq_length取4096') - args.seq_length = 4096 + raise RuntimeError('model_info未找到, Profiler阶段异常') else: with open(model_info, 'r') as file: data = json.load(file) @@ -176,6 +196,8 @@ def load_infos(args): if k == 'num_layers' and v == 1: # 留None 避免存下减层后脚本跑出来的num_layers: 1 continue setattr(args, k, v) + if not hasattr(args, "use_mcore_models"): + args.use_mcore_models = False # 记录使用测量数据基于的感知器版本 task_info = find_files(args.profiled_data_path, 'VERSION*.json') @@ -184,13 +206,16 @@ def load_infos(args): data = json.load(file) if 'version_profiler' not in data: args.version_profiler = data['version'] - args.version_framework = Version.MindSpeed_LLM_1_0_rc3 + args.version_framework = Version.ML_1_0_rc3 else: args.version_profiler = data['version_profiler'] + args.version_framework = Version.ML_1_0_rc3 args.model_name = data.get('model_name') args.model_size = data.get('model_size') if args.pretrain_script_path_search is None: args.pretrain_script_path = data.get('pretrain_script_path') + if isinstance(args.version, str): + args.version = version_parse(args.version) args.version_optimizer = optimizer_version() diff --git a/profiler/msprof_analyze/tinker/version.py b/profiler/msprof_analyze/tinker/version.py index 6ae90438c..7a95b1746 100644 --- a/profiler/msprof_analyze/tinker/version.py +++ b/profiler/msprof_analyze/tinker/version.py @@ -17,14 +17,16 @@ from datetime import datetime, timedelta, timezone import json import os +from tinker.utils.constant import Verison + # MAJOR和MAJOR编号目前暂时跟随ModelLink, 如1.2指支持到1.0.RC2, PATCH版本随时更新 PROFILER_VERSION_MAJOR = 1 PROFILER_VERSION_MINOR = 3 -PROFILER_VERSION_PATCH = 3 +PROFILER_VERSION_PATCH = 'dsv3' OPTIMIZER_VERSION_MAJOR = 1 OPTIMIZER_VERSION_MINOR = 3 -OPTIMIZER_VERSION_PATCH = 0 +OPTIMIZER_VERSION_PATCH = 'dsv3' def profiler_version(): @@ -51,5 +53,7 @@ def dump_task_info(dir_path, infos=None, cm=False): if infos: version_info.update(infos) os.makedirs(dir_path, exist_ok=True) + if isinstance(version_info['version'], Verison): + version_info['version'] = version_info['version'].value with open(os.path.join(dir_path, f'VERSION_{version}.json'), 'w') as version_file: - json.dump(version_info, version_file, indent=4) + json.dump(version_info, version_file, indent=4,ensure_ascii=False) -- Gitee From bd21a214db8cf9f68b98a0765ab7f2eb7b0410b9 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 09:15:34 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug,=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=AF=B9moe=E7=BB=93=E6=9E=84=E6=A8=A1=E5=9E=8B=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E5=AF=BB=E4=BC=98=EF=BC=8C=E5=A2=9E=E5=8A=A0cp?= =?UTF-8?q?=E5=AF=BB=E4=BC=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework_adapter/modellink_adapter.py | 4 +- .../tinker/model/adapter_utils.py | 56 +++++++++++++------ .../tinker/model/block_adapters.py | 4 +- .../tinker/profiler/block_profiler.py | 7 +-- .../msprof_analyze/tinker/profiler/profile.sh | 20 +++---- .../tinker/profiler/profile_classes.py | 2 + .../tinker/profiler/profile_space.py | 12 ++-- .../tinker/search/cost_model.py | 4 +- .../msprof_analyze/tinker/search/optimize.py | 17 ++++-- .../msprof_analyze/tinker/utils/constant.py | 1 + profiler/msprof_analyze/tinker/version.py | 4 +- 11 files changed, 80 insertions(+), 51 deletions(-) diff --git a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py index 81686566d..bb31b7c28 100644 --- a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py +++ b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py @@ -205,14 +205,14 @@ class ModelLinkAdapter100(ModelLinkAdapter11): 1.0.0相关接口 """ def __init__(self): - super.__init__() + super().__init__() self.original_autograd = None def initialize(self): from modellink.training.initialize import initialize_megatron initialize_megatron() def pre_time_profile_backward_step(self): from megatron.core.pipeline_parallel import schedules - from tinker.megtron_patch.schedules import custom_backward + from tinker.megatron_patch.schedules import custom_backward if self.original_custom_backward is None: self.original_custom_backward = schedules.custom_backward schedules.custom_backward = custom_backward diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index cea4f19fa..5cb47c05b 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -20,21 +20,17 @@ import os import sys import textwrap from ctypes.macholib.framework import framework_info -from typing import List, Tuple, Dict +from typing import List, Tuple, Dict, Set import astor from tinker.model.block_adapters import BlockAdapter, legacy_block_adapters, mcore_block_adapters, \ - mcore_block_adapter_1_1_rc1 + mcore_block_adapters_1_1_rc1 from tinker.utils.config import TINKER_DIR from tinker.utils.constant import Version from tinker.utils.logger import logger from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package, \ - check_path_exit - -from profiler.msprof_analyze.setup import version -from profiler.msprof_analyze.tinker.model.block_infos import get_block_adapters -from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package + check_path_exist from tinker.utils.constant import MODULE_NAME, PYTHON_STANDARD_INDENT PYTHON_STANDARD_INDENT = ' ' * 4 @@ -45,7 +41,7 @@ block_adapter_file_path = os.path.join(TINKER_DIR, f'model/{MODULE_NAME}.py') def get_framework_path(version:str) ->str: """临时方案""" framework_path = os.path.join(project_root(), 'modellink-ref', f'modellink-{version}') - check_path_exit(framework_path) + check_path_exist(framework_path) return framework_path @@ -203,18 +199,42 @@ def get_import_code_str(module_obj_list): return '\n'.join(all_imports) -def get_module_methods(module_obj): +def get_module_methods(module_obj) -> Dict[str, str]: """ 获取模块对象所有方法和源码的组合 :param module_obj: 模块对象 :return: """ - functions = { - name: inspect.getsource(getattr(module_obj, name)) - for name in dir(module_obj) - if inspect.isfunction(getattr(module_obj, name)) - } - return functions + module_file = inspect.getsourcefile(module_obj) + if not module_file: + return {} + methods: Dict[str, str] = {} + for name, obj in inspect.getmembers(module_obj, inspect.isfunction): + if getattr(obj, "__module__", None) == module_obj.__name__L + obj_file = inspect.getsourcefile(obj) + if obj_file == module_file: + try: + methods[name] = inspect.getsource(obj) + except OSError: + pass + return methods + +class _CallNameCollector(ast.NodeVisitor): + def __init__(self): + self.called_names: Set[str] = set() + def visit_Call(self, node: ast.Call): + if isinstance(node.func, ast.Name): + self.called_names.add(node.func.id) + self.generic_visit(node) + +def collect_called_names(code:str) -> Set[str]: + try: + tree = ast.parse(code) + except SyntaxError: + return set() + visitor = _CallNameCollector() + visitor.visit(tree) + return visitor.called_names def error_free_import(): @@ -241,9 +261,9 @@ def gen_block_adapter(use_mcore_models): :param use_mcore_models: 是否使用 mcore :return: """ - package_path = get_framework_path(version) + package_path = os.getenv('ML_PATH', None) if not package_path: - raise RuntimeError(f'The version is not supported: {version}') + raise RuntimeError("ML_PATH is not set") if not os.path.exists(package_path): raise RuntimeError(f'The package path is not exist: {package_path}') # 这里特殊处理一下1.0 版本的patch,以防止后续 MethodLocation 导入报错 @@ -281,7 +301,7 @@ def gen_block_adapter(use_mcore_models): raise write_lines(file_content.splitlines(), block_adapter_file_path) -def get_block_adapters(version, use_mcore_models) +def get_block_adapters(version, use_mcore_models): if use_mcore_models: if version == Version.ML_2_0_0: return mcore_block_adapters_1_1_rc1 diff --git a/profiler/msprof_analyze/tinker/model/block_adapters.py b/profiler/msprof_analyze/tinker/model/block_adapters.py index 4d3555126..bb6221aac 100644 --- a/profiler/msprof_analyze/tinker/model/block_adapters.py +++ b/profiler/msprof_analyze/tinker/model/block_adapters.py @@ -191,10 +191,10 @@ mcore_block_adapters = [ input_source=[ParamSource('hidden_states'), ParamSource('input_ids', from_forward=False), ParamSource('position_ids', from_forward=False), - ParamSource('attention_mask', from_forward=False) + ParamSource('attention_mask', from_forward=False), ParamSource('labels', from_forward=False)] ) ] -mcore_block_adapters_1_1_rc1 = [adapter for adaptor in mcore_block_adapters] +mcore_block_adapters_1_1_rc1 = [adapter for adapter in mcore_block_adapters] mcore_block_adapters_1_1_rc1[3] = replace(mcore_block_adapters[3],method_location=replace(mcore_block_adapters[3].method_location,start_key_word='attention_mask)')) \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index 26be9f7a9..dce044ca7 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -423,7 +423,6 @@ class TinkerProfiler: def update_tensor_map(self, block_infos: List[BlockInfo], mbs): forward_output = None - InitTensorInfo = namedtuple('InitTensorInfo', ['shape', 'requires_grad', 'device', 'dtype', 'element_size']) # 初始化 if self.block_tensor_map is None: first_input = self._get_first_input() @@ -437,8 +436,8 @@ class TinkerProfiler: torch.distributed.barrier() forward_output = wrapped_block(input_tensors) profile_logger.debug(f'[Tinker.Debug] tensor信息获取成功 - {block_info.name}') - extract_input_tensors_info = self.extract_input_tensors(input_tensors, InitTensorInfo) - extract_output_tensors_info = self.extract_input_tensors(forward_output, InitTensorInfo) + extract_input_tensors_info = self.extract_input_tensors(input_tensors) + extract_output_tensors_info = self.extract_input_tensors(forward_output) block_tensor_map[block_info.name] = extract_input_tensors_info, extract_output_tensors_info self.block_tensor_map = block_tensor_map self.last_mbs = mbs @@ -502,7 +501,7 @@ class TinkerProfiler: while self._should_run(block_info,run_times): run_times += 1 oom = self._get_result(block_info, oom) - self.dump_profiled_results() + self.dump_profiled_results(block_infos) return not oom def _get_result(self, block_info, oom): diff --git a/profiler/msprof_analyze/tinker/profiler/profile.sh b/profiler/msprof_analyze/tinker/profiler/profile.sh index e4a7900ee..c969a401c 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile.sh +++ b/profiler/msprof_analyze/tinker/profiler/profile.sh @@ -20,25 +20,25 @@ if [[ $(pwd) == *"ma-user"* ]]; then fi # 1. 命令行入参校验 若异常则提示检查`profile_space.py` -if [ "$#" -lt 7 ]; then +if [ "$#" -lt 8 ]; then echo "Error: Script profile.sh requires at least 7 arguments, but get $# arguments" echo " Supposed arguments: model_name model_size exp_id num_npu profiled_data_path ml_version profiled_args" echo " Please check TinkerScripter.run_config() in profile_space.py" exit 1 fi -MODEL_NAME = ${1} -MODEL_SIZE = ${2} -EXP_ID = ${3} -NPROC = ${4} -PROFILING_PATH = ${5} +MODEL_NAME=${1} +MODEL_SIZE=${2} +EXP_ID=${3} +NPROC=${4} +PROFILING_PATH=${5} #profiler 需要使用的训练框架版本信息 -export ML_VERSION =${6} -TINKER_ARGS = ${7} +export ML_VERSION=${6} +TINKER_ARGS=${7} export IS_TUNE=${10:-0} #可调节的profiler超参,目前看取3-10和10-40无影响 -WARMUP_TIMES = 3 -REPEAT_TIMES =10 +WARMUP_TIMES=3 +REPEAT_TIMES=10 echo "=====================================work on ${EXP_ID} =======================================" #2. 允许在其他路径使用tinker diff --git a/profiler/msprof_analyze/tinker/profiler/profile_classes.py b/profiler/msprof_analyze/tinker/profiler/profile_classes.py index 2fd5419c5..5abf705b7 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_classes.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_classes.py @@ -25,6 +25,8 @@ class ScriptArgs: tp: int = 1 sp: int = 0 ep: int = 1 + cp: int = 1 + extend: int = 0 @property def cmd_text_list(self): diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index 71e7ec130..6bfebfa83 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -132,7 +132,7 @@ class ScriptArgs: return not any(illegal) def build_megtron_command_line_arguments(self, version, max_mbs, kv_repeat): - result = [f'--tensor-model-parallel-size {self.tp}', f'--pipeline-model-parallel-size 1' + result = [f'--tensor-model-parallel-size {self.tp}', f'--pipeline-model-parallel-size 1', f'--distributed-timeout-minutes 5', f'--prof-mbs-limit {max_mbs}'] if self.ep > 1 : result.append(f'--expert-model-parallel-size {self.ep}') @@ -149,7 +149,7 @@ class ScriptArgs: result.append(f'--kv-head-repeat-before-uly-alltoall') if self.extend: result.append(f'--moe-tp-extend-ep') - result.append('--moe-alltoall-overlap-comn') + #result.append('--moe-alltoall-overlap-comn') return '\n'.join(result) @@ -271,7 +271,7 @@ class TinkerScripter: @property def can_extend(self): - return self.version in [Version.ML_2_0_0] + return self.version in [Version.ML_2_0_0] and self.can_ep @staticmethod def post_process(mbs_limit, oom, process, torchrun_failed): @@ -325,7 +325,7 @@ class TinkerScripter: cmd_args_str = config.build_megtron_command_line_arguments(self.version, max_mbs, kv_repeat) # 测量block内并行策略性能 tp sp ep cp command = ['bash', self.profiler_script, self.model, model_size, config.exp_id, str(config.npu_used), - self.profiler_data_path, self.version.value, cmd_args_str, self.is_full_tune] + self.profiled_data_path, self.version.value, cmd_args_str, self.is_full_tune] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid) mbs_limit = self.MAX_MBS @@ -387,7 +387,7 @@ def get_model_structure(args): setattr(model_args, key, True) model_args.group_query_attention = getattr(model_args, 'group_query_attention', False) model_args.num_query_groups = getattr(model_args, 'num_query_groups', 1) - model_args.num_query_groups = getattr(model_args, 'use_mcore_models', False) + model_args.use_mcore_models = getattr(model_args, 'use_mcore_models', False) if args.max_npu is None: TinkerScripter.MAX_NPU = model_args.nproc_per_node # 删除用于传递单节点卡数的属性,该属性并不是模型参数 @@ -440,7 +440,7 @@ def run(args: argparse.Namespace): pre_log(pre_logging_text) model_args = get_model_structure(args) # 自动生成adapter - gen_block_adapter(hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) + gen_block_adapter(args.version,hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) profiler = TinkerScripter(model_name, model_size, suffix, args, model_args, args.is_full_tune) # 生成参数空间 arg_space = profiler.get_arg_space() diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index a19a8f1dc..6d5ee9f68 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -242,7 +242,7 @@ class TinkerCostModel: block.attention_mask_mem = attention_mask_mem if profile_args.sp: num_elements = self.args.seq_length *self.args.hiddensize * 2 /profile_args.cp - block_list[1].attention_mask_mem = byte_to_mb(num_elements + block_list[1].attention_mask_mem = byte_to_mb(num_elements) def init_blocks(self, profile_args: ProfileArgs) -> List[BlockArgs]: """当前就是头处理 + 若干个block + 尾处理,调用时机确定 ProfileArgs 之后""" @@ -372,7 +372,7 @@ class TinkerCostModel: if self.inter_band is None and self.intra_band is None: raise RuntimeError("Intra bandwidth and intra bandwidth file are required.") self._band_data_ready = True - def _get_block_name(self): + def _get_block_names(self): return [block_name for block_name, block_num in self.args.block_structure] diff --git a/profiler/msprof_analyze/tinker/search/optimize.py b/profiler/msprof_analyze/tinker/search/optimize.py index bd6f844ff..e3dbb85fc 100644 --- a/profiler/msprof_analyze/tinker/search/optimize.py +++ b/profiler/msprof_analyze/tinker/search/optimize.py @@ -31,6 +31,7 @@ from tinker.search.cost_model import TinkerCostModel from tinker.search.arguments import print_args, preprocess_args from tinker.utils.utils import read_file, load_infos, convert_to_num_layers +from tinker.utils.constant import Version from tinker.utils.logger import logger, init_log MAX_FLOAT = 1.0e9 @@ -113,15 +114,21 @@ class TinkerOptimizer(Optimizer): if num_npus % profiled_args.tp: continue # stage变量的搜索空间生成 - pp_space = TinkerCostModel.get_pp_range(num_npus, args.num_layers, profiled_args) # type: Iterator - dist_opt_space = [0] if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 else [0, 1] + num_layers = sum(num_layer for _, num_layer in args.block_structure) + pp_space = TinkerCostModel.get_pp_range(num_npus, num_layers, profiled_args) # type: Iterator + if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 and args.version == Verison.ML_1_0_rc1: + dist_opt_space = [0] + else: + dist_opt_space = [0, 1] recompute_space = [0, 1] # TODO 支持逐block重计算,当前使用统一full recompute # 生成任务队列 for pp, dist_opt, recompute in itertools.product(pp_space, dist_opt_space, recompute_space): - dp = num_npus // pp // profiled_args.tp + dp = num_npus // pp // profiled_args.tp // profiled_args.cp + if profiled_args.ep >dp: + continue local_batch_size = dp * profiled_args.mbs - if args.global_batch_size % local_batch_size or dp == 1 and dist_opt: + if args.global_batch_size % local_batch_size or dp * profiled_args.cp == 1 and dist_opt: continue search_args = SearchArgs( pp=pp, @@ -130,7 +137,7 @@ class TinkerOptimizer(Optimizer): dist_opt=dist_opt, **profiled_args.__dict__ # 继承 profiled_args 的所有属性 ) - blocks = self.cost_model.init_blocks(profiled_args, self.user_args.num_layers) + blocks = self.cost_model.init_blocks(profiled_args) for block in blocks: block.update_cost_model_args({ "dp": dp, diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py index 8d05212bf..620afabe5 100644 --- a/profiler/msprof_analyze/tinker/utils/constant.py +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -37,6 +37,7 @@ VERSION_ALIASES: Dict[str, Version] = { "1.0": Version.ML_1_0_rc1, "1.1": Version.ML_1_0_rc2, "1.2": Version.ML_1_0_rc3, + "2.0.0": Version.ML_2_0_0, } PYTHON_STANDARD_INDENT = ' ' * 4 diff --git a/profiler/msprof_analyze/tinker/version.py b/profiler/msprof_analyze/tinker/version.py index 7a95b1746..65467169a 100644 --- a/profiler/msprof_analyze/tinker/version.py +++ b/profiler/msprof_analyze/tinker/version.py @@ -17,7 +17,7 @@ from datetime import datetime, timedelta, timezone import json import os -from tinker.utils.constant import Verison +from tinker.utils.constant import Version # MAJOR和MAJOR编号目前暂时跟随ModelLink, 如1.2指支持到1.0.RC2, PATCH版本随时更新 PROFILER_VERSION_MAJOR = 1 @@ -53,7 +53,7 @@ def dump_task_info(dir_path, infos=None, cm=False): if infos: version_info.update(infos) os.makedirs(dir_path, exist_ok=True) - if isinstance(version_info['version'], Verison): + if isinstance(version_info['version'], Version): version_info['version'] = version_info['version'].value with open(os.path.join(dir_path, f'VERSION_{version}.json'), 'w') as version_file: json.dump(version_info, version_file, indent=4,ensure_ascii=False) -- Gitee From a5a2135f6e8dcc38ebcf6228f425b328491c1150 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 09:20:25 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=EF=BC=8C=E6=94=AF=E6=8C=81tp=E3=80=81cp?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/msprof_analyze/tinker/parameter_config.json | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/profiler/msprof_analyze/tinker/parameter_config.json b/profiler/msprof_analyze/tinker/parameter_config.json index 7385002e2..7ea64a32f 100644 --- a/profiler/msprof_analyze/tinker/parameter_config.json +++ b/profiler/msprof_analyze/tinker/parameter_config.json @@ -3,10 +3,13 @@ "model_name": "example", "model_size": "7b", "pretrain_script_path": "./pretrain_qwen15_7b_ptd.sh", - "version": "1.2", + "version": "2.0.0", "save_path": "./profiled_data", "prof_tp": null, "prof_sp": null, + "prof_cp": null, + "prof_ep": null, + "prof_extend": 0, "max_mbs": 65536, "task_id": "test", "max_npu": 8 @@ -30,6 +33,8 @@ "simu_pp": 1, "simu_ep": 1, "simu_sp": 0, + "simu_cp": 0, + "simu_extend": 0, "dist_opt": 0, "micro_batch_size": 1, "num_layer_list": null, -- Gitee From 2f0971d2496ce20d5d464fa11303df0de6da76c6 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 09:23:31 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=EF=BC=8C=E6=94=AF=E6=8C=81tp=E3=80=81cp?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/msprof_analyze/tinker/parameter_config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/profiler/msprof_analyze/tinker/parameter_config.json b/profiler/msprof_analyze/tinker/parameter_config.json index 7ea64a32f..bb6242239 100644 --- a/profiler/msprof_analyze/tinker/parameter_config.json +++ b/profiler/msprof_analyze/tinker/parameter_config.json @@ -10,7 +10,7 @@ "prof_cp": null, "prof_ep": null, "prof_extend": 0, - "max_mbs": 65536, + "max_mbs": 2, "task_id": "test", "max_npu": 8 }, -- Gitee From 8ff3d1a8e9d91e0a006884cab7c0eb515b7ad151 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 10:05:06 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/msprof_analyze/tinker/model/adapter_utils.py | 4 ++-- profiler/msprof_analyze/tinker/profiler/profile.sh | 2 +- profiler/msprof_analyze/tinker/search/optimize.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index 5cb47c05b..ef9c20a4a 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -210,7 +210,7 @@ def get_module_methods(module_obj) -> Dict[str, str]: return {} methods: Dict[str, str] = {} for name, obj in inspect.getmembers(module_obj, inspect.isfunction): - if getattr(obj, "__module__", None) == module_obj.__name__L + if getattr(obj, "__module__", None) == module_obj.__name__: obj_file = inspect.getsourcefile(obj) if obj_file == module_file: try: @@ -254,7 +254,7 @@ def error_free_import(): raise RuntimeError(f'No available patch framework') -def gen_block_adapter(use_mcore_models): +def gen_block_adapter(version,use_mcore_models): """ 从用户提供的版本以及是否启用mcore_model,动态生成适配每个版本的 block_adapter :param version: 版本号 diff --git a/profiler/msprof_analyze/tinker/profiler/profile.sh b/profiler/msprof_analyze/tinker/profiler/profile.sh index c969a401c..cf999d8d7 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile.sh +++ b/profiler/msprof_analyze/tinker/profiler/profile.sh @@ -35,7 +35,7 @@ PROFILING_PATH=${5} #profiler 需要使用的训练框架版本信息 export ML_VERSION=${6} TINKER_ARGS=${7} -export IS_TUNE=${10:-0} +export IS_TUNE=${8} #可调节的profiler超参,目前看取3-10和10-40无影响 WARMUP_TIMES=3 REPEAT_TIMES=10 diff --git a/profiler/msprof_analyze/tinker/search/optimize.py b/profiler/msprof_analyze/tinker/search/optimize.py index e3dbb85fc..45b53c924 100644 --- a/profiler/msprof_analyze/tinker/search/optimize.py +++ b/profiler/msprof_analyze/tinker/search/optimize.py @@ -116,7 +116,7 @@ class TinkerOptimizer(Optimizer): # stage变量的搜索空间生成 num_layers = sum(num_layer for _, num_layer in args.block_structure) pp_space = TinkerCostModel.get_pp_range(num_npus, num_layers, profiled_args) # type: Iterator - if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 and args.version == Verison.ML_1_0_rc1: + if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 and args.version == Version.ML_1_0_rc1: dist_opt_space = [0] else: dist_opt_space = [0, 1] -- Gitee From 72ad20583826b8f2e883bb99734b7a04b5bbc259 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 10:05:06 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dcodecheck=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework_adapter/modellink_adapter.py | 5 +++++ .../tinker/megatron_patch/arguments.py | 2 ++ .../tinker/megatron_patch/microbatches.py | 6 +++--- .../tinker/model/adapter_utils.py | 19 +++++++++++++------ .../tinker/model/block_adapters.py | 2 +- .../tinker/profiler/block_profiler.py | 15 +++++++-------- .../msprof_analyze/tinker/profiler/profile.sh | 2 +- .../tinker/profiler/profile_space.py | 19 +++++++++---------- .../tinker/search/cost_model.py | 18 ++++++++++-------- .../msprof_analyze/tinker/search/optimize.py | 4 ++-- .../msprof_analyze/tinker/utils/constant.py | 2 +- profiler/msprof_analyze/tinker/utils/utils.py | 2 +- profiler/msprof_analyze/tinker/version.py | 7 ++++--- 13 files changed, 59 insertions(+), 44 deletions(-) diff --git a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py index bb31b7c28..9bc61597e 100644 --- a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py +++ b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py @@ -29,6 +29,7 @@ import torch from tinker.utils.constant import Version, version_parse + class ModelLinkAdapter(ABC): @abstractmethod def get_args(self): @@ -207,15 +208,18 @@ class ModelLinkAdapter100(ModelLinkAdapter11): def __init__(self): super().__init__() self.original_autograd = None + def initialize(self): from modellink.training.initialize import initialize_megatron initialize_megatron() + def pre_time_profile_backward_step(self): from megatron.core.pipeline_parallel import schedules from tinker.megatron_patch.schedules import custom_backward if self.original_custom_backward is None: self.original_custom_backward = schedules.custom_backward schedules.custom_backward = custom_backward + def pre_mem_profile_backward_step(self): from megatron.core.pipeline_parallel import schedules schedules.custom_backward = self.original_custom_backward @@ -236,6 +240,7 @@ version_map: Dict[str, Type[ModelLinkAdapter]] = { '2.0.0': ModelLinkAdapter200 } + class ModelLinkAdapterTune200(ModelLinkAdapter100): """ 2.0.0 tune 相关接口相较1.0.0 输入数据处理变更了 diff --git a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py index 6ea718018..86aa7c381 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py @@ -24,9 +24,11 @@ NUM_DENSE = 0 def get_num_layers(): return NUM_LAYERS + def get_dense(): return NUM_DENSE + def extra_args_provider_decorator(extra_args_provider): @wraps(extra_args_provider) def wrapper(parser): diff --git a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py index 0d65486e1..0f1081992 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py @@ -62,9 +62,9 @@ def rebuild_num_microbatches_calculator(): from megtron.core.num_microbatches_calculator import reconfigure_micro_batch_calculator reconfigure_micro_batch_calculator(rank=args.rank, rampup_batch_size=None, - global_batch_size = args.global_batch_size, - micro_batch_size = args.micro_batch_size, - data_parallel_size = args.data_parallel_size) + global_batch_size=args.global_batch_size, + micro_batch_size=args.micro_batch_size, + data_parallel_size=args.data_parallel_size) return except ImportError: pass diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index 5cb47c05b..0f9a74d1d 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -29,16 +29,18 @@ from tinker.model.block_adapters import BlockAdapter, legacy_block_adapters, mco from tinker.utils.config import TINKER_DIR from tinker.utils.constant import Version from tinker.utils.logger import logger -from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx, get_lines, read_file, path_to_package, \ - check_path_exist +from tinker.utils.utils import write_lines, project_root, find_keywords_line_idx,\ + get_lines, read_file, path_to_package, check_path_exist from tinker.utils.constant import MODULE_NAME, PYTHON_STANDARD_INDENT + PYTHON_STANDARD_INDENT = ' ' * 4 MODULE_NAME = 'genned_block_forward' block_adapter_file_path = os.path.join(TINKER_DIR, f'model/{MODULE_NAME}.py') -def get_framework_path(version:str) ->str: + +def get_framework_path(version:str) -> str: """临时方案""" framework_path = os.path.join(project_root(), 'modellink-ref', f'modellink-{version}') check_path_exist(framework_path) @@ -210,7 +212,7 @@ def get_module_methods(module_obj) -> Dict[str, str]: return {} methods: Dict[str, str] = {} for name, obj in inspect.getmembers(module_obj, inspect.isfunction): - if getattr(obj, "__module__", None) == module_obj.__name__L + if getattr(obj, "__module__", None) == module_obj.__name__: obj_file = inspect.getsourcefile(obj) if obj_file == module_file: try: @@ -219,15 +221,18 @@ def get_module_methods(module_obj) -> Dict[str, str]: pass return methods + class _CallNameCollector(ast.NodeVisitor): def __init__(self): self.called_names: Set[str] = set() + def visit_Call(self, node: ast.Call): if isinstance(node.func, ast.Name): self.called_names.add(node.func.id) self.generic_visit(node) -def collect_called_names(code:str) -> Set[str]: + +def collect_called_names(code: str) -> Set[str]: try: tree = ast.parse(code) except SyntaxError: @@ -254,7 +259,7 @@ def error_free_import(): raise RuntimeError(f'No available patch framework') -def gen_block_adapter(use_mcore_models): +def gen_block_adapter(version, use_mcore_models): """ 从用户提供的版本以及是否启用mcore_model,动态生成适配每个版本的 block_adapter :param version: 版本号 @@ -301,6 +306,7 @@ def gen_block_adapter(use_mcore_models): raise write_lines(file_content.splitlines(), block_adapter_file_path) + def get_block_adapters(version, use_mcore_models): if use_mcore_models: if version == Version.ML_2_0_0: @@ -308,6 +314,7 @@ def get_block_adapters(version, use_mcore_models): return mcore_block_adapters return legacy_block_adapters + def find_used_top_func(import_code_str, method_forward_str, module_obj): """ 前向方法中用到,但import中没有,那么需要加入这个方法 diff --git a/profiler/msprof_analyze/tinker/model/block_adapters.py b/profiler/msprof_analyze/tinker/model/block_adapters.py index bb6221aac..deee203f9 100644 --- a/profiler/msprof_analyze/tinker/model/block_adapters.py +++ b/profiler/msprof_analyze/tinker/model/block_adapters.py @@ -197,4 +197,4 @@ mcore_block_adapters = [ ] mcore_block_adapters_1_1_rc1 = [adapter for adapter in mcore_block_adapters] -mcore_block_adapters_1_1_rc1[3] = replace(mcore_block_adapters[3],method_location=replace(mcore_block_adapters[3].method_location,start_key_word='attention_mask)')) \ No newline at end of file +mcore_block_adapters_1_1_rc1[3] = replace(mcore_block_adapters[3], method_location=replace(mcore_block_adapters[3].method_location, start_key_word='attention_mask)')) \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index dce044ca7..223eeaba8 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -31,7 +31,7 @@ from torch_npu.npu import amp # 选择引用的ModelLink版本 from tinker import megatron_patch from tinker.framework_adapter.modellink_adapter import get_adapter, ModelLinkAdapter -from tinker.megatron_patch.arguments import get_num_layers,get_dense +from tinker.megatron_patch.arguments import get_num_layers, get_dense from tinker.megatron_patch.microbatches import rebuild_num_microbatches_calculator from tinker.model.block_infos import get_model_block_infos, BlockInfo from tinker.profiler.profile_classes import ProfileArgs, InitTensorInfo @@ -285,8 +285,7 @@ class TinkerProfiler: self.adapter.pre_time_profile_backward_step() # 同步所有rank torch.distributed.barrier() - #self.adapter.backward_step(backward_input_tensors, origin_outputs, output_grads) - self.adapter.custom_backward(origin_outputs,output_grads[0]) + self.adapter.custom_backward(origin_outputs, output_grads[0]) torch.distributed.barrier() # 额外跑一次对齐时间 self.adapter.custom_backward(origin_outputs, output_grads[0]) @@ -498,13 +497,13 @@ class TinkerProfiler: EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE] continue run_times = 0 - while self._should_run(block_info,run_times): + while self._should_run(block_info, run_times): run_times += 1 oom = self._get_result(block_info, oom) self.dump_profiled_results(block_infos) return not oom - def _get_result(self, block_info, oom): + def _get_result(self, block_info, oom): profile_logger.info(f"working on {block_info.dump_name}{self.profile_args.hint} ... ") try: @@ -533,8 +532,8 @@ class TinkerProfiler: allocated_fwd, reserved_fwd, reserved_bwd] return oom - def _should_run(self, block_info:BlockInfo, run_times: int): - should_modify = get_dense() and 'transformer' in block_info.name and run_times <2 + def _should_run(self, block_info: BlockInfo, run_times: int): + should_modify = get_dense() and 'transformer' in block_info.name and run_times < 2 if should_modify: #首次进入,先将layers搞成dense self._modify_transformer_layers(block_info, already_dense=run_times) @@ -559,7 +558,7 @@ class TinkerProfiler: """此处使用modellink代码,需密切关注patch、变更情况""" return HeadInputTensor(self.adapter.get_head_input()) - def _modify_transformer_layers(self,block_info:BlockInfo, already_dense): + def _modify_transformer_layers(self, block_info: BlockInfo, already_dense): if not already_dense: block_info.dump_name = 'transformer_dense' if self.origin_module is None: diff --git a/profiler/msprof_analyze/tinker/profiler/profile.sh b/profiler/msprof_analyze/tinker/profiler/profile.sh index c969a401c..cf999d8d7 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile.sh +++ b/profiler/msprof_analyze/tinker/profiler/profile.sh @@ -35,7 +35,7 @@ PROFILING_PATH=${5} #profiler 需要使用的训练框架版本信息 export ML_VERSION=${6} TINKER_ARGS=${7} -export IS_TUNE=${10:-0} +export IS_TUNE=${8} #可调节的profiler超参,目前看取3-10和10-40无影响 WARMUP_TIMES=3 REPEAT_TIMES=10 diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index 6bfebfa83..b8a37e52e 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -79,7 +79,7 @@ class ScriptArgs: ep: int = 1 cp: int = 1 # --moe-tp-extend-ep:做的TP在MoE-mlp部分做EP,此处为01取值,1为开启,开启后实际EP = TP + EP - extend: int =0 + extend: int = 0 @property @@ -94,14 +94,14 @@ class ScriptArgs: p_times = 1 for p in [self.tp, self.cp]: if p > 1: - p_times *=p + p_times *= p #若EP在CP中放不下,则要新增DP if self.cp % self.ep: if self.ep % self.cp: p_times *= self.ep else: p_times *= self.ep // self.cp - return p_times + return p_times def items(self): return self.__dict__.items() @@ -134,9 +134,9 @@ class ScriptArgs: def build_megtron_command_line_arguments(self, version, max_mbs, kv_repeat): result = [f'--tensor-model-parallel-size {self.tp}', f'--pipeline-model-parallel-size 1', f'--distributed-timeout-minutes 5', f'--prof-mbs-limit {max_mbs}'] - if self.ep > 1 : + if self.ep > 1: result.append(f'--expert-model-parallel-size {self.ep}') - #TODO 后续版本军科启用 --moe-alltoall-overlap-comm, 无法与gemm-gradient-accumulation-fusion 同时使用 + # 后续版本均可启用 --moe-alltoall-overlap-comm, 无法与gemm-gradient-accumulation-fusion 同时使用 if version == Version.ML_2_0_0 and self.tp == 1: result.append(f'--moe-alltoall-overlap-comm') if self.sp: @@ -149,7 +149,6 @@ class ScriptArgs: result.append(f'--kv-head-repeat-before-uly-alltoall') if self.extend: result.append(f'--moe-tp-extend-ep') - #result.append('--moe-alltoall-overlap-comn') return '\n'.join(result) @@ -248,7 +247,7 @@ class TinkerScripter: MAX_MBS = 2 ** 30 MAX_NPU = 8 - def __init__(self, model, model_size, suffix, args, model_args,is_full_tune): + def __init__(self, model, model_size, suffix, args, model_args, is_full_tune): self.model = model self.model_size = model_size self.profiler_script = f"{TINKER_DIR}/profiler/profile.sh" @@ -315,13 +314,13 @@ class TinkerScripter: Feature(name="extend", range=extend_space, memory_impact=MemoryImpact.NEUTRAL), ], self.MAX_NPU, self.MAX_MBS, self.model_args) - def get_kv_repeat(self, config:ScriptArgs) -> bool: + def get_kv_repeat(self, config: ScriptArgs) -> bool: return self.model_args.group_query_attention and config.cp > self.model_args.num_query_groups def run_config(self, config: ScriptArgs, max_mbs: int) -> int: # 格式化model_size model_size = extract_and_format_model_size(self.model_size) - kv_repeat = self.get_kv_repeat(config) + kv_repeat = self.get_kv_repeat(config) cmd_args_str = config.build_megtron_command_line_arguments(self.version, max_mbs, kv_repeat) # 测量block内并行策略性能 tp sp ep cp command = ['bash', self.profiler_script, self.model, model_size, config.exp_id, str(config.npu_used), @@ -440,7 +439,7 @@ def run(args: argparse.Namespace): pre_log(pre_logging_text) model_args = get_model_structure(args) # 自动生成adapter - gen_block_adapter(args.version,hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) + gen_block_adapter(args.version, hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) profiler = TinkerScripter(model_name, model_size, suffix, args, model_args, args.is_full_tune) # 生成参数空间 arg_space = profiler.get_arg_space() diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index 6d5ee9f68..bbf827b46 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -32,7 +32,7 @@ from tinker.search.arguments import print_args, preprocess_args from tinker.search.data import TaskParam, SearchArgs, ResultArgs, Metrics from tinker.search.process import ResultOutputHandler from tinker.utils.block_args import BlockArgs, BlockCost, DetailedInfo -from tinker.utils.utils import load_infos, convert_to_pp_stage_block_idx,byte_to_mb +from tinker.utils.utils import load_infos, convert_to_pp_stage_block_idx, byte_to_mb from tinker.utils.logger import logger, init_log FeaturesType = ProfileArgs @@ -111,7 +111,8 @@ class TinkerCostModel: # 剔除transformer block块后的其余块个数 self.num_procs_per_node = args.num_npus_per_node self.args = args - self.block_names = self._get_block_names() #BlockName(*self.profiled_data.get_block_name()) + # BlockName(*self.profiled_data.get_block_name()) + self.block_names = self._get_block_names() @property def _data_ready(self): @@ -233,7 +234,7 @@ class TinkerCostModel: block_data = data[block_name] return BlockArgs(self.args, profiled_args, block_data) - def set_seq_length_relate_mem(self, profile_args:ProfileArgs, block_list: List[BlockArgs]): + def set_seq_length_relate_mem(self, profile_args: ProfileArgs, block_list: List[BlockArgs]): if self.args.use_mcore_models: block_list1[1].attention_mask_mem = 4 else: @@ -241,15 +242,15 @@ class TinkerCostModel: for block in block_list[1:-2]: block.attention_mask_mem = attention_mask_mem if profile_args.sp: - num_elements = self.args.seq_length *self.args.hiddensize * 2 /profile_args.cp + num_elements = self.args.seq_length * self.args.hiddensize * 2 /profile_args.cp block_list[1].attention_mask_mem = byte_to_mb(num_elements) def init_blocks(self, profile_args: ProfileArgs) -> List[BlockArgs]: """当前就是头处理 + 若干个block + 尾处理,调用时机确定 ProfileArgs 之后""" # 头处理块 block_list: List[BlockArgs] = [] - for block_name ,block_num in self.args.block_structure: - for i in range(block_num): + for block_name, block_num in self.args.block_structure: + for _ in range(block_num): block_list.append(self.get_block_args(block_name, profile_args)) #transformer block 注入仅使用一次的 与序列长度`seq_length`相关的内存占用 self.set_seq_length_relate_mem(profile_args, block_list) @@ -372,8 +373,9 @@ class TinkerCostModel: if self.inter_band is None and self.intra_band is None: raise RuntimeError("Intra bandwidth and intra bandwidth file are required.") self._band_data_ready = True + def _get_block_names(self): - return [block_name for block_name, block_num in self.args.block_structure] + return [block_name for block_name, block_num in self.args.block_structure] def convert_to_pp_stage_block_idx(num_layer_list: List[int], num_all_blocks_len: int): @@ -408,7 +410,7 @@ def run(args: argparse.Namespace): # 1. 实例化CostModel cost_model = TinkerCostModel(args) # 2. 从观测数据中用户指定的仿真所需数据 args -> pred_profiled_args - pred_profiled_args = ProfileArgs(tp=args.simu_tp, sp=args.simu_sp, ep=args.simu_ep,cp=args.simu_cp, + pred_profiled_args = ProfileArgs(tp=args.simu_tp, sp=args.simu_sp, ep=args.simu_ep, cp=args.simu_cp, extend=args.simu_extend, mbs=args.micro_batch_size) # 3. 计算开销 # 3.1 生成子图 diff --git a/profiler/msprof_analyze/tinker/search/optimize.py b/profiler/msprof_analyze/tinker/search/optimize.py index e3dbb85fc..bfc87be87 100644 --- a/profiler/msprof_analyze/tinker/search/optimize.py +++ b/profiler/msprof_analyze/tinker/search/optimize.py @@ -116,7 +116,7 @@ class TinkerOptimizer(Optimizer): # stage变量的搜索空间生成 num_layers = sum(num_layer for _, num_layer in args.block_structure) pp_space = TinkerCostModel.get_pp_range(num_npus, num_layers, profiled_args) # type: Iterator - if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 and args.version == Verison.ML_1_0_rc1: + if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 and args.version == Version.ML_1_0_rc1: dist_opt_space = [0] else: dist_opt_space = [0, 1] @@ -125,7 +125,7 @@ class TinkerOptimizer(Optimizer): # 生成任务队列 for pp, dist_opt, recompute in itertools.product(pp_space, dist_opt_space, recompute_space): dp = num_npus // pp // profiled_args.tp // profiled_args.cp - if profiled_args.ep >dp: + if profiled_args.ep > dp: continue local_batch_size = dp * profiled_args.mbs if args.global_batch_size % local_batch_size or dp * profiled_args.cp == 1 and dist_opt: diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py index 620afabe5..196f17bbc 100644 --- a/profiler/msprof_analyze/tinker/utils/constant.py +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -29,7 +29,7 @@ class Version(enum.Enum): return self.value @classmethod - def is_valid_value(cls,value: str) -> bool: + def is_valid_value(cls, value: str) -> bool: return value in cls._value2member_map_ VERSION_ALIASES: Dict[str, Version] = { diff --git a/profiler/msprof_analyze/tinker/utils/utils.py b/profiler/msprof_analyze/tinker/utils/utils.py index e9a016ac5..e54329934 100644 --- a/profiler/msprof_analyze/tinker/utils/utils.py +++ b/profiler/msprof_analyze/tinker/utils/utils.py @@ -174,7 +174,7 @@ def print_result(user_args, config_result_pair): value_dict = dict(tp=strategy.tp, pp=strategy.pp, dp=strategy.dp, cp=strategy.cp, sp=strategy.sp, ep=strategy.ep, extend=strategy.extend, - zero=strategy.zero, mbs=strategy.mbs,recompute=strategy.rc, + zero=strategy.zero, mbs=strategy.mbs, recompute=strategy.rc, num_layer_list=list(map(int, strategy.num_layers.split(','))), time_costs=time_costs, mem_costs=mem_costs) # 先加后删 diff --git a/profiler/msprof_analyze/tinker/version.py b/profiler/msprof_analyze/tinker/version.py index 65467169a..7e092d4b7 100644 --- a/profiler/msprof_analyze/tinker/version.py +++ b/profiler/msprof_analyze/tinker/version.py @@ -53,7 +53,8 @@ def dump_task_info(dir_path, infos=None, cm=False): if infos: version_info.update(infos) os.makedirs(dir_path, exist_ok=True) - if isinstance(version_info['version'], Version): - version_info['version'] = version_info['version'].value + version_tmp = version_info.get('version') + if isinstance(version_tmp, Version): + version_info['version'] = version_tmp.value with open(os.path.join(dir_path, f'VERSION_{version}.json'), 'w') as version_file: - json.dump(version_info, version_file, indent=4,ensure_ascii=False) + json.dump(version_info, version_file, indent=4, ensure_ascii=False) -- Gitee From 6da38e17da90909a65f3ea0ebb1f9c5af92ef629 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 12:10:32 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dcodecheck=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/msprof_analyze/tinker/model/adapter_utils.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index 64f79fa91..0f9a74d1d 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -212,7 +212,7 @@ def get_module_methods(module_obj) -> Dict[str, str]: return {} methods: Dict[str, str] = {} for name, obj in inspect.getmembers(module_obj, inspect.isfunction): - if getattr(obj, "__module__", None) == module_obj.__name__L + if getattr(obj, "__module__", None) == module_obj.__name__: obj_file = inspect.getsourcefile(obj) if obj_file == module_file: try: @@ -221,15 +221,18 @@ def get_module_methods(module_obj) -> Dict[str, str]: pass return methods + class _CallNameCollector(ast.NodeVisitor): def __init__(self): self.called_names: Set[str] = set() + def visit_Call(self, node: ast.Call): if isinstance(node.func, ast.Name): self.called_names.add(node.func.id) self.generic_visit(node) -def collect_called_names(code:str) -> Set[str]: + +def collect_called_names(code: str) -> Set[str]: try: tree = ast.parse(code) except SyntaxError: @@ -256,7 +259,7 @@ def error_free_import(): raise RuntimeError(f'No available patch framework') -def gen_block_adapter(use_mcore_models): +def gen_block_adapter(version, use_mcore_models): """ 从用户提供的版本以及是否启用mcore_model,动态生成适配每个版本的 block_adapter :param version: 版本号 @@ -303,6 +306,7 @@ def gen_block_adapter(use_mcore_models): raise write_lines(file_content.splitlines(), block_adapter_file_path) + def get_block_adapters(version, use_mcore_models): if use_mcore_models: if version == Version.ML_2_0_0: @@ -310,6 +314,7 @@ def get_block_adapters(version, use_mcore_models): return mcore_block_adapters return legacy_block_adapters + def find_used_top_func(import_code_str, method_forward_str, module_obj): """ 前向方法中用到,但import中没有,那么需要加入这个方法 -- Gitee From abafc3b6aff3ce324d676bf5f065bb60c71c6ee1 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 12:26:30 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dcodecheck=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/msprof_analyze/tinker/model/adapter_utils.py | 2 +- profiler/msprof_analyze/tinker/model/block_adapters.py | 8 +++++++- profiler/msprof_analyze/tinker/search/cost_model.py | 3 +-- profiler/msprof_analyze/tinker/version.py | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/profiler/msprof_analyze/tinker/model/adapter_utils.py b/profiler/msprof_analyze/tinker/model/adapter_utils.py index 0f9a74d1d..578726557 100644 --- a/profiler/msprof_analyze/tinker/model/adapter_utils.py +++ b/profiler/msprof_analyze/tinker/model/adapter_utils.py @@ -40,7 +40,7 @@ MODULE_NAME = 'genned_block_forward' block_adapter_file_path = os.path.join(TINKER_DIR, f'model/{MODULE_NAME}.py') -def get_framework_path(version:str) -> str: +def get_framework_path(version: str) -> str: """临时方案""" framework_path = os.path.join(project_root(), 'modellink-ref', f'modellink-{version}') check_path_exist(framework_path) diff --git a/profiler/msprof_analyze/tinker/model/block_adapters.py b/profiler/msprof_analyze/tinker/model/block_adapters.py index deee203f9..43ef98471 100644 --- a/profiler/msprof_analyze/tinker/model/block_adapters.py +++ b/profiler/msprof_analyze/tinker/model/block_adapters.py @@ -197,4 +197,10 @@ mcore_block_adapters = [ ] mcore_block_adapters_1_1_rc1 = [adapter for adapter in mcore_block_adapters] -mcore_block_adapters_1_1_rc1[3] = replace(mcore_block_adapters[3], method_location=replace(mcore_block_adapters[3].method_location, start_key_word='attention_mask)')) \ No newline at end of file +mcore_block_adapters_1_1_rc1[3] = replace( + mcore_block_adapters[3], + method_location=replace( + mcore_block_adapters[3].method_location, + start_key_word='attention_mask)' + ) +) \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index bbf827b46..9bfd9ca53 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -111,7 +111,6 @@ class TinkerCostModel: # 剔除transformer block块后的其余块个数 self.num_procs_per_node = args.num_npus_per_node self.args = args - # BlockName(*self.profiled_data.get_block_name()) self.block_names = self._get_block_names() @property @@ -242,7 +241,7 @@ class TinkerCostModel: for block in block_list[1:-2]: block.attention_mask_mem = attention_mask_mem if profile_args.sp: - num_elements = self.args.seq_length * self.args.hiddensize * 2 /profile_args.cp + num_elements = self.args.seq_length * self.args.hiddensize * 2 / profile_args.cp block_list[1].attention_mask_mem = byte_to_mb(num_elements) def init_blocks(self, profile_args: ProfileArgs) -> List[BlockArgs]: diff --git a/profiler/msprof_analyze/tinker/version.py b/profiler/msprof_analyze/tinker/version.py index 7e092d4b7..effbadfb4 100644 --- a/profiler/msprof_analyze/tinker/version.py +++ b/profiler/msprof_analyze/tinker/version.py @@ -53,7 +53,7 @@ def dump_task_info(dir_path, infos=None, cm=False): if infos: version_info.update(infos) os.makedirs(dir_path, exist_ok=True) - version_tmp = version_info.get('version') + version_tmp = version_info.get('version') if isinstance(version_tmp, Version): version_info['version'] = version_tmp.value with open(os.path.join(dir_path, f'VERSION_{version}.json'), 'w') as version_file: -- Gitee From faeda74878fb2956273b90e7b4ef9fb04e7daef8 Mon Sep 17 00:00:00 2001 From: zhangning Date: Tue, 26 Aug 2025 15:21:45 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tinker/megatron_patch/microbatches.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py index 0f1081992..a2406c097 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py @@ -35,14 +35,6 @@ def get_num_microbatches(): return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.get() -def get_current_global_batch_size(): - return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.get_current_global_batch_size() - - -def update_num_microbatches(consumed_samples, consistency_check=True): - _GLOBAL_NUM_MICROBATCHES_CALCULATOR.update(consumed_samples, consistency_check) - - def _build_num_microbatches_calculator(args): global _GLOBAL_NUM_MICROBATCHES_CALCULATOR modellink_version = os.getenv('ML_VERSION', "1.1") @@ -56,6 +48,10 @@ def _build_num_microbatches_calculator(args): _GLOBAL_NUM_MICROBATCHES_CALCULATOR = build_num_microbatches_calculator(args) +def get_current_global_batch_size(): + return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.get_current_global_batch_size() + + def rebuild_num_microbatches_calculator(): args = get_args() try: @@ -69,3 +65,8 @@ def rebuild_num_microbatches_calculator(): except ImportError: pass _build_num_microbatches_calculator(args) + + +def update_num_microbatches(consumed_samples, consistency_check=True): + _GLOBAL_NUM_MICROBATCHES_CALCULATOR.update(consumed_samples, consistency_check) + -- Gitee