diff --git a/profiler/msprof_analyze/tinker/CHANGELOG.md b/profiler/msprof_analyze/tinker/CHANGELOG.md new file mode 100644 index 0000000000000000000000000000000000000000..4a069962fcbfa7e93d748358ca5e4f033d37fada --- /dev/null +++ b/profiler/msprof_analyze/tinker/CHANGELOG.md @@ -0,0 +1,45 @@ +### Profiler + +1.3.3 + +- 子图前向代码动态提取 + +1.3.1b + +- 根据脚本动态生成profile范围 +- 校正子图wrap逻辑,自动化生成attention_mask + +1.3.0 + +- 适配ModelLink 1.0.RC3(对应袁明明方式的ModelLink-1.2) +- 使用block adapter重构mcore block + +1.2.2A (adapter) + +- 使用adapter封装对训练框架的import +- 4block: 使用block adapter重构legacy block + +1.2.2 + +- 张量信息自动提取 +- 在`barrier`之后再多跑一轮预热,并将`event.start()`放在预热后 +- 将`host`侧`time.time()`时间测量方式修改为`torch.cuda.Event`的`elapsed_time`测量方式,去除所有`synchronize`操作 +- 在时间测量前新增`barrier`同步各设备 + +1.2.0 + +- 适配ModelLink 1.0.RC2(对应原命名方式的ModelLink-1.1) + +### Optimizer + +1.3.0 + +- 入口改造 +- 修复并行策略重复问题 + +1.2.0 + +- 调整`zero`切分内容:2阶优化器状态、全精度权重参数 +- 调整`reserved`内存仿真逻辑 +- 增加`attention_mask`内存占用仿真 +- 调整`recompute`内存仿真,开启时保留`input`内存占用 \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/ReadMe.md b/profiler/msprof_analyze/tinker/ReadMe.md index c9968f1439f3741de209a22371325b49b25fbc57..58f4c45337d536611425bed31d51f31db46222bf 100644 --- a/profiler/msprof_analyze/tinker/ReadMe.md +++ b/profiler/msprof_analyze/tinker/ReadMe.md @@ -1,34 +1,28 @@ # 功能概述 -Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供的训练脚本, 进行约15分钟的单节点NPU性能测量, 推荐高性能并行策略训练脚本。 - -1.数据测量:指定待优化的`.sh`训练脚本, `Tinker`通过`profiler`,获取指定模型在当前硬件上,各模型子图在不同并行策略下的开销性能数据; -2.策略寻优:指定待优化的`.sh`训练脚本以及步骤1测量数据, `Tinker`通过`optimizer`, 推荐时间开销最优的并行策略。 -3.获取推荐结果脚本:策略寻优默认将脚本生成在`./results/`下的子文件夹中。 +Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供的训练脚本,进行约15分钟的单节点NPU性能测量,推荐高性能并行策略训练脚本。 +1. 数据测量:指定待优化的`.sh`训练脚本,`Tinker`通过`profiler`,获取指定模型在当前硬件上,各模型子图在不同并行策略下的开销性能数据; +2. 策略寻优:指定待优化的`.sh`训练脚本以及步骤1测量数据,`Tinker`通过`optimizer`,推荐时间开销最优的并行策略。 +3. 获取推荐结果脚本:策略寻优默认将脚本生成在`./results/`下的子文件夹中。 # 使用前CheckList -1. 配置训练框架相关环境, 请确保使用当前环境已安装/配置`CANN套件`、`训练框架依赖库`、`megatron`、`mindspeed加速库`等依赖(Tinker运行需求训练框架能力); - - | 软件及版本 | ModelLink 1.0.RC3 (1.2) | ModelLink 1.0.RC2 (1.1) | ModelLink 1.0.RC1 (1.0) | - |-----------|-------------------------|-------------------------|-------------------------| - | Python | 3.8 | 3.8 | 3.8 | - | driver | Ascend HDK 24.1.RC3 | Ascend HDK 24.1.RC2 | Ascend HDK 24.1.RC1 | - | driver | Ascend HDK 24.1.RC3 | Ascend HDK 24.1.RC2 | Ascend HDK 24.1.RC1 | - | firmware | Ascend HDK 24.1.RC3 | Ascend HDK 24.1.RC2 | Ascend HDK 24.1.RC1 | - | CANN | CANN 8.0.RC3 | CANN 8.0.RC2 | CANN 8.0.RC1 | - | torch | 2.1.0、 2.2.0 | 2.1.0、 2.2.0 | 2.1.0 | - | torch_npu | release v6.0.RC3 | release v6.0.RC2 | release v6.0.RC1 | - | mindspeed | git checkout 4ea42a23 | git checkout 2b0edd | git checkout 224ae35 | +1. 配置训练框架相关环境:请确保使用当前环境已安装/配置`CANN套件`、`训练框架依赖库`、`megatron`、`mindspeed加速库`等依赖(Tinker运行需求训练框架能力); + + | 软件及版本 | ModelLink 1.0.RC3 (1.2) | ModelLink 1.0.RC2 (1.1) | ModelLink 1.0.RC1 (1.0) | + |-----------|-------------------------|-------------------------|-------------------------| + | Python | 3.8 | 3.8 | 3.8 | + | driver | Ascend HDK 24.1.RC3 | Ascend HDK 24.1.RC2 | Ascend HDK 24.1.RC1 | + | firmware | Ascend HDK 24.1.RC3 | Ascend HDK 24.1.RC2 | Ascend HDK 24.1.RC1 | + | CANN | CANN 8.0.RC3 | CANN 8.0.RC2 | CANN 8.0.RC1 | + | torch | 2.1.0、2.2.0 | 2.1.0、2.2.0 | 2.1.0 | + | torch_npu | release v6.0.RC3 | release v6.0.RC2 | release v6.0.RC1 | + | mindspeed | git checkout 4ea42a23 | git checkout 2b0edd | git checkout 224ae35 | 其他依赖安装步骤请参考训练框架readme -2. 准备`待优化训练脚本`: 配置待优化的`.sh`训练脚本, 特别是词表文件(TOKENIZER_PATH或TOKENIZER_MODEL)和数据文件(DATA_PATH)路径配置(Tinker寻优时需要对齐用户实际训练时的模型(训练配置)); - -3. 导入ModelLink(MindSpeed-LLM)框架路径`export ML_PATH=/XX/XX/XX/MindSpeed-LLM` - -4. 克隆mstt库`git clone https://gitee.com/ascend/mstt.git`,并切换到poc-tinker分支`git switch poc-tinker` +2. 准备`待优化训练脚本`:配置待优化的`.sh`训练脚本,特别是词表文件(TOKENIZER_PATH或TOKENIZER_MODEL)和数据文件(DATA_PATH)路径配置(Tinker寻优时需要对齐用户实际训练时的模型(训练配置)); # 具体使用方式 @@ -36,7 +30,7 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 【通用入口】 -`python /xxx/tinker/tinker_auto_parallel.py --mode XXX --config XXX.json ...` +`python ./tinker/tinker_auto_parallel.py --mode XXX --config XXX.json ...` 【参数】 @@ -46,7 +40,6 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 【注意】 1. mode、config以外的其它参数均为tinker工具使能的相关运行参数,手动指定后会覆盖config.json中的默认参数 - 2. 使能tinker工具时可选择在命令中指定相关参数,也可以选择直接将参数写入config.json中并指定为config参数对应的配置文件 【配置文件示例】 @@ -54,10 +47,10 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 ```json { "profile": { - "model_name": null, - "model_size": null, - "pretrain_script_path": null, - "version": "1.1", + "model_name": "qwen15", + "model_size": "7b", + "pretrain_script_path": "./pretrain_qwen15_7b_ptd.sh", + "version": "1.2", "save_path": "./profiled_data", "prof_tp": null, "prof_sp": null, @@ -67,7 +60,6 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 }, "simulate": { "profiled_data_path": null, - "num_layers": null, "global_batch_size": null, "num_nodes": null, "num_npus_per_node": null, @@ -82,12 +74,13 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 }, "search": { "profiled_data_path": null, - "global_batch_size": null, - "num_nodes": null, - "num_npus_per_node": null, - "cpus": 5, + "global_batch_size": 64, + "num_nodes": 4, + "num_npus_per_node": 8, + "cpus": 20, "memory_limit": 57000, - "output_dir": null + "output_dir": "./results", + "pretrain_script_path_search": null } } ``` @@ -96,15 +89,15 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 【入口】 -`python /xxx/tinker/tinker_auto_parallel.py --mode profile` +`python ./tinker/tinker_auto_parallel.py --mode profile` 【参数】 -- `--model_name`: 位置参数, 用于标识模型名称 -- `--model_size`: 位置参数, 用于标识模型尺寸 +- `--model_name`: 位置参数,用于标识模型名称 +- `--model_size`: 位置参数,用于标识模型尺寸 - `-sh`: 指定`待优化脚本` -- `-v`: 指定使用的ModelLink框架版本, 可选`1.2` `1.1` `1.0` 分别对应`1.0.RC3` `1.0.RC2` `1.0.RC1`(可选,默认值1.1) -- `--save_path`: 指定采集数据存放路径(可选,默认为`。/profiled_data`) +- `-v`: 指定使用的ModelLink框架版本,可选`1.2` `1.1` `1.0` 分别对应`1.0.RC3` `1.0.RC2` `1.0.RC1`(可选,默认值1.1) +- `--save_path`: 指定采集数据存放路径(可选,默认为`./profiled_data`) - `--prof_tp`: 指定要采集profiling数据的TP值(可选,默认全部TP) - `--prof_sp`: 指定要采集profiling数据的SP值(可选,默认全部SP) - `--max_mbs`: 指定最大micro batch size(可选,默认值65536) @@ -114,53 +107,53 @@ Tinker大模型并行策略自动寻优系统(后称Tinker),根据提供 【命令示例】 ```shell -python /xxx/tinker/tinker_auto_parallel.py --mode profile --config_path /xxx/tinker/parameter_config.json --model_name qwen15 --model_size 7b -sh ./pretrain_qwen15_7b_ptd.sh -v 1.1 +python ./tinker/tinker_auto_parallel.py --mode profile --config_path ./tinker/parameter_config.json --model_name qwen15 --model_size 7b -sh ./pretrain_qwen15_7b_ptd.sh -v 1.1 ``` -运行后, Tinker会在`./profiled_data`下生成形如`profiled_data_xxx`的测量数据文件夹, 内含若干`.csv`文件以及运行日志 +运行后,Tinker会在`./profiled_data`下生成形如`profiled_data_xxx`的测量数据文件夹,内含若干`.csv`文件以及运行日志 ## 策略寻优 【入口】 -`python /xxx/tinker/tinker_auto_parallel.py --mode search` +`python ./tinker/tinker_auto_parallel.py --mode search` 【参数】 -- `-profiled`: 指定性能数据路径, 仅传入文件夹名称时, Tinker会在`./profiled_data`路径下寻找该文件 -- `-gbs` `-nodes` `-n`: 分别指定数据尺寸、节点数量、单节点计算卡数 -- `-mem`: 指定内存上限(单位MB,仅算子内存占用, 64G推荐设定57000,性能摸高时可设定60000; 32G内存推荐设定25000, 性能摸高可设定27000)(可选) -- `-cpus`: 指定多线程加速计算时使用的进程数(可选,默认值5) +- `-profiled`: 指定性能数据路径,仅传入文件夹名称时,Tinker会在`./profiled_data`路径下寻找该文件夹 +- `-gbs` `-nodes` `-n`: 分别指定数据批尺寸、节点数量、单节点计算卡数 +- `-mem`: 指定内存上限(单位MB,仅算子内存占用,64G内存推荐设定57000,性能摸高时可设定60000;32G内存推荐设定25000,性能摸高时可设定27000) +- `-cpus`: 指定多进程加速计算时使用的进程数(可选,默认值5) - `--output_dir`: 指定输出结果路径(可选,默认为`./results`) 【命令示例】请修改入参后运行 ```shell -python /xxx/tinker/tinker_auto_parallel.py --mode search --config_path /xxx/tinker/parameter_config.json -profiled profiled-data-qwen15-7b-241026-165956 -gbs 32 -nodes 1 -nodes 8 -mem 57000 -cpus 10 +python ./tinker/tinker_auto_parallel.py --mode search --config_path ./tinker/parameter_config.json -profiled profiled-data-qwen15-7b-241026-165956 -gbs 32 -nodes 1 -n 8 -mem 57000 --cpus 10 ``` -运行完成后, Tinker会在`results`文件中生成类似`qwen15-7b-gbs32-56000-1nodes8npus-2024-11-18-01-50-38`的文件夹, 其中`.log`为运行日志, `config`文件夹存放命令行参数结果。 +运行完成后,Tinker会在results文件夹中生成类似`qwen15-7b-gbs32-56000-1nodes8npus-2024-11-18-01-50-38`的文件夹,其中`.log`为运行日志,`config`文件夹存放命令行参数结果。 ## 一键运行(性能测量&策略寻优) 【入口】 -`python /xxx/tinker/tinker_auto_parallel.py --mode all` +`python ./tinker/tinker_auto_parallel.py --mode all` 【参数】性能测量+策略寻优 【命令示例】请修改入参后运行 ```shell -python /xxx/tinker/tinker_auto_parallel.py --mode all --config_path /xxx/tinker/parameter_config.json --model_name qwen15 --model_size 7b -sh ./pretrain_qwen15_7b_ptd.sh -v 1.1 -gbs 32 -nodes 1 -n 8 -mem 57000 --cpus 10 +python ./tinker/tinker_auto_parallel.py --mode all --config_path ./tinker/parameter_config.json --model_name qwen15 --model_size 7b -sh ./pretrain_qwen15_7b_ptd.sh -v 1.1 -gbs 32 -nodes 1 -nodes 8 -mem 57000 --cpus 10 ``` ## 仿真 【入口】 -`python /xxx/tinker/tinker_auto_parallel.py --mode simulate` +`python ./tinker/tinker_auto_parallel.py --mode simulate` 【参数】 -- `-profiled`: 指定性能数据路径, 仅传入文件夹名称时, Tinker会在`./profiled_data`路径下寻找该文件 +- `-profiled`: 指定性能数据路径,仅传入文件夹名称时,Tinker会在`./profiled_data`路径下寻找该文件 - `-gbs` `-nodes` `-n`: 分别指定数据尺寸、节点数量、单节点计算卡数 - `--simu_tp`: 指定tp值(可选,默认值1) - `--simu_pp`: 指定pp值(可选,默认值1) @@ -170,33 +163,32 @@ python /xxx/tinker/tinker_auto_parallel.py --mode all --config_path /xxx/tinker/ - `-mbs`: 指定micro batch size - `--num_layer_list`: 模型分层列表,例如4,4,4,4 - `--recompute`: 是否开启重计算(0关或1开) -- `-d`: 是否展示详细内存结构 +- `-d`: 是否展示详细开销信息 【命令示例】请修改入参后运行 ```shell -python /xxx/tinker/tinker_auto_parallel.py --mode simulate --config_path /xxx/tinker/parameter_config.json -profiled profiled-data-qwen15-7b-241026-165956 --num_nodes 1 --num_npus_per_node 8 --simu_tp 2 --simu_pp 4 --simu_sp 1 --zero 0 -mbs 4 --num_layer_list 8,8,8,4 --recompute 1 --global_batch_size 64 +python ./tinker/tinker_auto_parallel.py --mode simulate --config_path ./tinker/parameter_config.json -profiled profiled-data-qwen15-7b-241026-165956 --num_nodes 1 --num_npus_per_node 8 --simu_tp 2 --simu_pp 4 --simu_sp 1 --zero 0 -mbs 4 --num_layer_list 8,8,8,4 --recompute 1 --global_batch_size 64 ``` ## FAQ 1. 性能测量中Tinker对训练脚本依赖的说明 - - Tinker测量数据时会处理、储存指定的训练脚本并部分运行。请确保正确填写`TOKENIZER_PATH`、`DATA_PATH`(可参考ModelLink ReadMe填写)。Tinker在处理脚本时, 会删除torchrun逻辑、并行策略、预训练权重存取相关命令行参数,然后将其他模型、训练相关参数存放于`GPT_ARGS`中。具体剔除的参数包括, - - ``` - --tensor-model-parallel-size 2 - --pipeline-model-parallel-size 2 - --sequence-parallel - --context-parallel-size - --num-layers-per-virtual-pipeline-stage - --recompute-xxx - --use-distributed-optimizer - --overlap-param-gather - --num-layer-list - --save - --load - ``` - -2. 策略寻优中设定内存上限`-mem`推荐值的解:当前推荐值为经验值,。Tinker仅预测torch算子占用内存开销。因此设定`-mem`时需注意预留CANN、HCCL等组件的内存开销, 并避免极端内存使用带来的反复内存搬移带来的性能降低。 -3. Tinker在策略寻优完成后可能推荐新的并行策略, 此时权重参数可能需要额外转换, 请确保脚本中预训练权重参数(ckpt相关参数)匹配新并行策略, 且文件路径已正确配置。 \ No newline at end of file + + Tinker测量数据时会处理、储存指定的训练脚本并部分运行。请确保正确填写`TOKENIZER_PATH`,`DATA_PATH`(可参考ModelLink ReadMe填写)。Tinker在处理脚本时,会删除torchrun逻辑、并行策略、预训练权重存取相关命令行参数,然后将其他模型、训练相关参数存放于`GPT_ARGS`中。具体剔除的参数包括: + ``` + --tensor-model-parallel-size 2 + --pipeline-model-parallel-size 2 + --sequence-parallel + --context-parallel-size + --num-layers-per-virtual-pipeline-stage + --recompute-xxx + --use-distributed-optimizer + --overlap-param-gather + --num-layer-list + --save + --load + ``` + +2. 策略寻优中设定内存上限`-mem`推荐值的解释:当前推荐值为经验值。Tinker仅预测torch算子占用内存开销,因此设定`-mem`时需注意预留CANN、HCCL等组件的内存开销,并避免极端内存使用带来的反复内存搬移带来的性能降低。 +3. Tinker在策略寻优完成后可能推荐新的并行策略,此时权重参数可能需要额外转换,请确保脚本中预训练权重参数(ckpt相关参数)匹配新并行策略,且文件路径已正确配置。 \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py index 7d2195ea89d9a51b50424aec2c157cede5734ff7..9433feeae1025c264ac9fd9bb89776c310ea063e 100644 --- a/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py +++ b/profiler/msprof_analyze/tinker/framework_adapter/modellink_adapter.py @@ -194,28 +194,20 @@ class ModelLinkAdapter100(ModelLinkAdapter11): initialize_megatron() -version_map: Dict[str, Tuple[Type[ModelLinkAdapter], str]] = { - '1.0': (ModelLinkAdapter10, 'block_adapter_1_0'), - '1.1': (ModelLinkAdapter11, 'block_adapter_1_1'), - '1.2': (ModelLinkAdapter12, 'block_adapter_1_2'), - '1.0.0': (ModelLinkAdapter100, 'block_adapter_1_2') +version_map: Dict[str, Type[ModelLinkAdapter]] = { + '1.0': ModelLinkAdapter10, + '1.1': ModelLinkAdapter11, + '1.2': ModelLinkAdapter12, + '1.0.0': ModelLinkAdapter100 } -def get_code(x: int) -> Union[Type[ModelLinkAdapter], str]: - toolkit_version = os.getenv('ML_VERSION', '1.1') # 默认用ModelLink 1.1版本 - if toolkit_version not in version_map: - raise NotImplementedError(f'{toolkit_version}版本的Adapter暂未支持') - - return version_map[toolkit_version][x] - - def get_adapter() -> ModelLinkAdapter: """ 返回指定版本的adapter实例 """ - return get_code(0)() - + toolkit_version = os.getenv('ML_VERSION', '1.1') # 默认用ModelLink 1.1版本 + if toolkit_version not in version_map: + raise NotImplementedError(f'{toolkit_version}版本的Adapter暂未支持') -def get_block_adapter() -> str: - return get_code(1) + return version_map[toolkit_version]() \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py index 85c977f5491ccb3321de8f0804a95761940e8e95..cd217faf86ff616124dcdbc07cd038f489ed8807 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/arguments.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/arguments.py @@ -63,7 +63,7 @@ def _add_tinker_args(parser): def _add_profiler_args(parser): - profiler_group = parser.add_argument_group(title='flexpipe_profiler') + profiler_group = parser.add_argument_group(title='block_profiler') profiler_group.add_argument('--prof-path', type=str, default=None, help='') profiler_group.add_argument('--prof-cache-file', type=str, default=None, help='') @@ -98,7 +98,7 @@ def profile_args_wrapper(fn: Callable): def override_profile_args(args): args.data_parallel_size = args.world_size // (args.pipeline_model_parallel_size * args.tensor_model_parallel_size * args.context_parallel_size) - args.global_batch_size = args.data_parallel_size # 此次仅用于通过validation + args.global_batch_size = args.data_parallel_size # 此处仅用于通过validation args.micro_batch_size = 1 args.num_ops_in_each_stage = [1] args.virtual_pipeline_model_parallel_size = 1 diff --git a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py index bde220fd0c93e059ddb22e827ff78787f369ec42..16d6e4fbe2e0fbe247a35e599050477b4f103e71 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/microbatches.py @@ -22,24 +22,24 @@ try: except ImportError: from megatron.microbatches import build_num_microbatches_calculator, NumMicroBatchesCalculator -_MICROBATCHES_NMU_CALCULATOR = None # type: Optional[NumMicroBatchesCalculator] +_GLOBAL_NUM_MICROBATCHES_CALCULATOR = None # type: Optional[NumMicroBatchesCalculator] def get_num_microbatches(): - return _MICROBATCHES_NMU_CALCULATOR.get() + return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.get() def get_current_global_batch_size(): - return _MICROBATCHES_NMU_CALCULATOR.get_current_global_batch_size() + return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.get_current_global_batch_size() def update_num_microbatches(consumed_samples, consistency_check=True): - _MICROBATCHES_NMU_CALCULATOR.update(consumed_samples, consistency_check) + _GLOBAL_NUM_MICROBATCHES_CALCULATOR.update(consumed_samples, consistency_check) def _build_num_microbatches_calculator(args): - global _MICROBATCHES_NMU_CALCULATOR - _MICROBATCHES_NMU_CALCULATOR = build_num_microbatches_calculator(args) + global _GLOBAL_NUM_MICROBATCHES_CALCULATOR + _GLOBAL_NUM_MICROBATCHES_CALCULATOR = build_num_microbatches_calculator(args) def rebuild_num_microbatches_calculator(): diff --git a/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py b/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py index 06170091154e745538cf1f6a111fd516013b6992..0f2dfba0295912f72f5bedb843581afaa9e199ae 100644 --- a/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py +++ b/profiler/msprof_analyze/tinker/megatron_patch/modellink_version.py @@ -16,12 +16,14 @@ import os import sys -from tinker.utils.logger import logger +from tinker.utils.logger import LOGGER def modellink_import(): modellink_version = os.getenv('ML_VERSION', "1.1") modellink_path = os.getenv('ML_PATH', None) + if modellink_path is None or not os.path.exists(modellink_path): + raise RuntimeError("ML_PATH is not set") sys.path.append(modellink_path) try: if modellink_version == "1.0": @@ -33,9 +35,7 @@ def modellink_import(): import modellink import megatron except ModuleNotFoundError as e: - raise RuntimeError("ML_PATH is not available. Please make sure it !") from e + raise RuntimeError("ML_PATH is not available. Please make sure it is set correctly!") from e from tinker.megatron_patch.patch import patch - if modellink_path is None or not os.path.exists(modellink_path): - raise RuntimeError("ML_PATH is not set") logger.info(f'modellink path {modellink_path}') patch() diff --git a/profiler/msprof_analyze/tinker/model/block_infos.py b/profiler/msprof_analyze/tinker/model/block_infos.py index fe0e4cd17b987a441cdb3bbf20eb8a725e86cac7..45d31d8e48fa5d0d0a164a27dccb7ad6ffbb4e46 100644 --- a/profiler/msprof_analyze/tinker/model/block_infos.py +++ b/profiler/msprof_analyze/tinker/model/block_infos.py @@ -14,50 +14,118 @@ # limitations under the License. import importlib +import types from dataclasses import dataclass -from typing import List, Optional, Type +from typing import List import torch -from tinker.framework_adapter.modellink_adapter import get_block_adapter, ModelLinkAdapter -from tinker.model.block_adapter import BlockAdapter +from tinker.framework_adapter.modellink_adapter import ModelLinkAdapter +from tinker.model.adapter_utils import MODULE_NAME, get_forward_func_name +from tinker.model.block_adapters import BlockAdapter, mcore_block_adapters, legacy_block_adapters -block_adapter = importlib.import_module(f'tinker.model.{get_block_adapter()}') +# 这里直接写死 +forward_funcs = importlib.import_module(f'tinker.model.{MODULE_NAME}') + + +def standardize_forward(forward_func): + """ + 将调用方式从传统参数改为字典,并将输出包装成字典 + """ + + def wrapper(self, input_dict): + # 检查输入是否为字典 + if not isinstance(input_dict, dict): + raise ValueError("Input must be a dictionary") + + # 调用原始的 new_func,将字典解包为关键字参数 + outputs = forward_func(self, **input_dict) + + # 将输出包装成字典 + if not isinstance(outputs, tuple): + outputs = (outputs,) + return {k: v for k, v in zip(self.output_name, outputs)} + + return wrapper + + +def get_weight_size(modules: List[torch.nn.Module]) -> int: + """根据入参Module 自动计算权重参数尺寸""" + weight_size = 0 + for module in modules: + weight_size += sum(p.numel() for p in module.parameters() if p.requires_grad) + return weight_size + + +def get_forward_func(block_name): + """ + 通过block名称,匹配gen_block_adapter生成的前向函数 + """ + return getattr(forward_funcs, get_forward_func_name(block_name)) @dataclass class BlockInfo: - name: str - module: torch.nn.Module - block_adapter: Optional[Type[BlockAdapter]] = None + def __init__(self, block_adapter: BlockAdapter, model: torch.nn.Module): + # 所有block实例化所需的硬编码内容 + self.adapter: BlockAdapter = block_adapter + # block名称,仅起到标识作用,从BlockAdapter中获取 + self.name: str = block_adapter.block_name + # block对应module,延时生成 + self.module: torch.nn.Module = self._get_module(model) + @staticmethod + def _get_attr(obj, module_path): + attribute_paths = module_path.split(".") if module_path else [] + current = obj + for attr in attribute_paths: + current = getattr(current, attr) + return current -def get_model_block_infos(adapter: ModelLinkAdapter) -> List[BlockInfo]: - """获取需要的profile的block列表 block粒度观测时即头处理 TransformerBlock 两个尾处理""" - block_infos = [] # type: List[BlockInfo] - args = adapter.get_args() - model = adapter.get_model() + def get_block(self): + # 1. 替换实例forward + self.module.forward = types.MethodType(standardize_forward(get_forward_func(self.name)), self.module) + # 2. 计算权重尺寸,存到可访问的地方,如block实例中 + modules = [self._get_attr(self.module, module_name) for module_name in self.adapter.weight_param_module] + self.module.weight_size = get_weight_size(modules) + # 3. 指明block实例的输出列表 + self.module.output_name = self.adapter.return_values + return self.module + + def get_input_tensors(self, first_input, forward_output): + input_tensors = {} + for source in self.adapter.input_source: + if source.from_forward: + input_tensor = forward_output[source.source_name] + else: + input_tensor = getattr(first_input, source.source_name, None) + input_tensors[source.name] = input_tensor + return input_tensors + + def _get_module(self, model): + return self._get_attr(model, self.adapter.module_path) + + +def get_block_adapters(args) -> List[BlockAdapter]: if args.use_mcore_models: # mcore GPTModel - block_infos.append(BlockInfo("mcore-embedding", model, block_adapter.McoreEmbeddingAdapter)) - block_infos.append( - BlockInfo("mcore-transformer-block", model.decoder, block_adapter.McoreTransformerBlockAdapter)) - block_infos.append(BlockInfo("mcore-final-norm", model.decoder, block_adapter.McoreFinalNormAdapter)) - block_infos.append(BlockInfo("mcore-post-process", model, block_adapter.McoreLossAdapter)) - + block_adapters = mcore_block_adapters else: # legacy GPTModel - encoder = model.language_model.encoder + block_adapters = legacy_block_adapters + return block_adapters - # model.language_model.pre_process - block_infos.append(BlockInfo("embedding", model.language_model, block_adapter.EmbeddingAdapter)) - block_infos.append(BlockInfo("transformer-block", encoder, block_adapter.TransformerBlockAdapter)) - - # encoder.post_norm and encoder.post_process - block_infos.append(BlockInfo("final-norm", encoder, block_adapter.FinalNormAdapter)) - - # model.post_process - block_infos.append(BlockInfo("post-process", model, block_adapter.LossAdapter)) +def get_model_block_infos(adapter: ModelLinkAdapter) -> List[BlockInfo]: + """ + 通过block信息,获取需要profile的block列表 + """ + args = adapter.get_args() + model = adapter.get_model() + block_adapters = get_block_adapters(args) + block_infos = [] + for block_adapter in block_adapters: + block_info = BlockInfo(block_adapter, model) + block_infos.append(block_info) - return block_infos + return block_infos \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/parameter_config.json b/profiler/msprof_analyze/tinker/parameter_config.json index 54c68a46e248e4e822eb6aa0062ce8bed9865685..160650553d6fcbe477a72f5173c12fe3a9efa6dd 100644 --- a/profiler/msprof_analyze/tinker/parameter_config.json +++ b/profiler/msprof_analyze/tinker/parameter_config.json @@ -1,14 +1,14 @@ { "profile": { - "model_name": null, - "model_size": null, - "pretrain_script_path": null, - "version": "1.1", + "model_name": "example", + "model_size": "7b", + "pretrain_script_path": "./pretrain_qwen15_7b_ptd.sh", + "version": "1.2", "save_path": "./profiled_data", "prof_tp": null, "prof_sp": null, "max_mbs": 65536, - "task_id": "", + "task_id": "test", "max_npu": 8 }, "simulate": { @@ -27,11 +27,12 @@ }, "search": { "profiled_data_path": null, - "global_batch_size": null, - "num_nodes": null, - "num_npus_per_node": null, - "cpus": 5, + "global_batch_size": 64, + "num_nodes": 4, + "num_npus_per_node": 8, + "cpus": 20, "memory_limit": 57000, - "output_dir": "./results" + "output_dir": "./results", + "pretrain_script_path_search": null } } \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/block_profiler.py b/profiler/msprof_analyze/tinker/profiler/block_profiler.py index c814d423b618c882b1dd6be45eb350299cb46f96..91d9bd94253fe3c52748cabf0106a60ca0391b8a 100644 --- a/profiler/msprof_analyze/tinker/profiler/block_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/block_profiler.py @@ -18,12 +18,10 @@ import gc import json import logging import os -import sys import time import traceback from collections import namedtuple from typing import Dict, List, Optional -from pathlib import Path import numpy as np import torch @@ -31,18 +29,14 @@ import torch_npu from torch_npu.contrib import transfer_to_npu from torch_npu.npu import amp -current_dir = Path(__file__).resolve() -tinker_parent_dir = current_dir.parent.parent.parent -if tinker_parent_dir not in sys.path: - sys.path.append(str(tinker_parent_dir)) - +# 选择引用的ModelLink版本 from tinker import megatron_patch from tinker.framework_adapter.modellink_adapter import get_adapter, ModelLinkAdapter from tinker.megatron_patch.arguments import get_num_layers from tinker.megatron_patch.microbatches import rebuild_num_microbatches_calculator -from tinker.model.block_infos import get_model_block_infos +from tinker.model.block_infos import get_model_block_infos, BlockInfo from tinker.model.observation_block import gen_block -from tinker.utils.logger import init_profile_log, profile_logger +from tinker.utils.logger import init_profile_log, PROFILE_LOGGER from tinker.utils.npu_timer import NPUTimer from tinker.utils.profile_args import ProfileArgs from tinker.utils.utils import byte_to_mb @@ -75,7 +69,7 @@ class TinkerProfiler: def __init__(self, adapter): self.adapter: ModelLinkAdapter = adapter self.args = self.adapter.get_args() - self.dump_model_info() # todo 该方法当前会反复dump, 但设置变量判断是否存在又无法覆盖历史文件 + self.dump_model_info() # todo 该方法当前会反复dump,但设置变量判断是否存在又无法覆盖历史文件 self.args.model_name = self.args.prof_model_name self.profiled_results = {} self.data_base = None # from DATA_BASE @@ -160,7 +154,7 @@ class TinkerProfiler: with open(dump_file_path, 'w') as f: json.dump(dump_dict, f, indent=4) - def infer_data_size(self, block_infos): + def infer_data_size(self, block_infos: BlockInfo): """ 仅剩余 weight_size 部分的计算逻辑 :param block_infos: @@ -168,7 +162,7 @@ class TinkerProfiler: """ for block_info in block_infos: cur_input_tensors, cur_output_tensors = self.block_tensor_map[block_info.name] - block = gen_block(block_info) + block = block_info.get_block() self.weight_size_dict[block_info.name] = np.prod(block.weight_size) * self.data_base @@ -191,8 +185,8 @@ class TinkerProfiler: continue if "mask" in input_name: inputs[input_name] = ( - torch.rand(input_info.shape, requires_grad=input_info.requires_grad, device=input_info.device, - dtype=input_info.dtype) < 0.5) + torch.rand(input_info.shape, requires_grad=input_info.requires_grad, device=input_info.device, + dtype=input_info.dtype) < 0.5) else: inputs[input_name] = ( torch.rand(input_info.shape, requires_grad=input_info.requires_grad, device=input_info.device, @@ -219,9 +213,9 @@ class TinkerProfiler: return origin_outputs, output_grads - def profile_block(self, block_info): + def profile_block(self, block_info: BlockInfo): gc.collect() - block = self.adapter.wrap_block(gen_block(block_info)) + block = self.adapter.wrap_block(block_info.gen_block()) self.adapter.pre_profile_block() input_data = self.get_inputs(block_info.name) @@ -233,7 +227,7 @@ class TinkerProfiler: # 让反向中保存计算图以支撑多次调用 self.adapter.pre_time_profile_backward_step() - if 'post-process' in block_info.name: + if "post" in block_info.name: # 需匹配梯度版本,1次前向 1次反向 交替进行 for index in range(self.args.prof_repeat_times[0] + self.args.prof_warmup_times): # 内存溢出得厉害,提个函数尝试规避下 @@ -332,7 +326,7 @@ class TinkerProfiler: self.adapter.pre_mem_profile_backward_step() torch.cuda.synchronize() - if block_info.name == "encoder-embedding" or block_info.name == "mcore-embedding": + if "embedding" in block_info.name: torch.autograd.backward(outputs, grad_tensors=output_grads, retain_graph=True) else: backward_input_tensors = [] @@ -379,7 +373,7 @@ class TinkerProfiler: if torch.distributed.get_rank() != 0: return args = self.args - profile_logger.info(f"====== PROFILING RESULTS ({self.profile_args.model}{self.profile_args.hint}) ======") + PROFILE_LOGGER.info(f"====== PROFILING RESULTS ({self.profile_args.model}{self.profile_args.hint}) ======") result_title = ["block_name", "forward-compute", "backward-compute", "input_size", "output_size", "weights", "activations", "fwd_reserved", "bwd_reserved"] with open(os.path.join(args.prof_path, self.profile_args.file_name), 'w') as f_result: @@ -409,7 +403,7 @@ class TinkerProfiler: shape_list[i] = mbs return torch.Size(shape_list) - def update_tensor_map(self, block_infos, mbs): + def update_tensor_map(self, block_infos: List[BlockInfo], mbs): forward_output = None InitTensorInfo = namedtuple('InitTensorInfo', ['shape', 'requires_grad', 'device', 'dtype', 'element_size']) # 初始化 @@ -417,10 +411,10 @@ class TinkerProfiler: first_input = self._get_first_input() block_tensor_map = {} for block_info in block_infos: - genned_block = gen_block(block_info) + genned_block = block_info.gen_block() wrapped_block = self.adapter.wrap_block(genned_block) # 拿到当前 block 的 input_tensors_info 和 input_extra_tensors_info - input_tensors = _get_info_tensors(first_input, forward_output, block_info) + input_tensors = block_info.get_input_tensors(first_input, forward_output) forward_output = wrapped_block(input_tensors) extract_input_tensors_info = self.extract_input_tensors(input_tensors, InitTensorInfo) @@ -463,7 +457,7 @@ class TinkerProfiler: self.update_micro_batch_size(task["mbs"]) except AssertionError: traceback.print_exc() - profile_logger.error( + PROFILE_LOGGER.error( f"Invalid GBS MBS DP pair: [{args.global_batch_size, args.micro_batch_size, args.data_parallel_size}]") return False @@ -477,20 +471,16 @@ class TinkerProfiler: for block_info in block_infos: if oom: # 因profile_space的日志监控会在block_profiler发生oom时kill进程,因此该处逻辑不会进行;但保留该处以防kill失败 - profile_logger.info(f'[results] already oom, skip {block_info.name}') + PROFILE_LOGGER.info(f'[results] already oom, skip {block_info.name}') self.profiled_results[block_info.name] = [EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, self.input_size_dict[block_info.name], self.output_size_dict[block_info.name], self.weight_size_dict[block_info.name], EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE, EXCEPTIONAL_VALUE] continue - profile_logger.info(f"working on {block_info.name}{self.profile_args.hint} ... ") + PROFILE_LOGGER.info(f"working on {block_info.name}{self.profile_args.hint} ... ") try: - (_fwd_time, - _bwd_time, - _reserved_fwd, - _reserved_bwd, - _allocated_fwd) = self.profile_block(block_info) + fwd_time, bwd_time, reserved_fwd, reserved_bwd, allocated_fwd = self.profile_block(block_info) except RuntimeError as e: if "NPU out of memory" in str(e): # OOM 没必要测试更大的mbs @@ -499,17 +489,17 @@ class TinkerProfiler: profile_logger.error(f'RANK{torch.distributed.get_rank()}: {"-*/" * 20}') profile_logger.error(e) traceback.print_exc() - _fwd_time = _bwd_time = EXCEPTIONAL_VALUE - _reserved_fwd = _reserved_bwd = _allocated_fwd = EXCEPTIONAL_VALUE - profile_logger.info(f"[results] {block_info.name}: fwd_compute = {_fwd_time:.2f} us, " - f"bwd_compute = {_bwd_time:.2f} us, fwd_allocated = {_allocated_fwd:.1f} MB, " - f"fwd_reserved = {_reserved_fwd:.1f} MB, bwd_reserved = {_reserved_bwd:.1f} MB.") + fwd_time = bwd_time = EXCEPTIONAL_VALUE + reserved_fwd = reserved_bwd = allocated_fwd = EXCEPTIONAL_VALUE + profile_logger.info(f"[results] {block_info.name}: fwd_compute = {fwd_time:.2f} us, " + f"bwd_compute = {bwd_time:.2f} us, fwd_allocated = {allocated_fwd:.1f} MB, " + f"fwd_reserved = {reserved_fwd:.1f} MB, bwd_reserved = {reserved_bwd:.1f} MB.") - self.profiled_results[block_info.name] = [_fwd_time, _bwd_time, + self.profiled_results[block_info.name] = [fwd_time, bwd_time, self.input_size_dict[block_info.name], self.output_size_dict[block_info.name], self.weight_size_dict[block_info.name], - _allocated_fwd, _reserved_fwd, _reserved_bwd] + allocated_fwd, reserved_fwd, reserved_bwd] self.dump_profiled_results(block_infos) return not oom @@ -566,7 +556,7 @@ def _get_info_tensors(first_input, forward_output: dict, block_info): def get_backward_input_tensors(block_info, input_data): """""" - if block_info.name == "encoder-embedding" or block_info.name == "mcore-embedding": + if "embedding" in block_info.name: return None input_tensors = [] for input_name in input_data: @@ -607,12 +597,12 @@ def main(): oom_record = True run_well = tinker_profiler.run_profile(prof_task, oom_record) if not run_well and not oom_record: - profile_logger.info(f"[Tinker-Profiler] OOM when mbs={prof_task['mbs']}") + PROFILE_LOGGER.info(f"[Tinker-Profiler] OOM when mbs={prof_task['mbs']}") oom_record = True end_profiling_time = time.time() - profile_logger.info(f"[TOTAL PROFILING TIME] {end_profiling_time - start_profiling_time:2f} s") + PROFILE_LOGGER.info(f"[TOTAL PROFILING TIME] {end_profiling_time - start_profiling_time:2f} s") if __name__ == "__main__": diff --git a/profiler/msprof_analyze/tinker/profiler/gen_ascendcloud_structure.py b/profiler/msprof_analyze/tinker/profiler/gen_ascendcloud_structure.py index 25821b4bd03a71500ea059838f80f9c59d28ad46..e2f0ed35c8b09869897ce86f8c21728400298894 100644 --- a/profiler/msprof_analyze/tinker/profiler/gen_ascendcloud_structure.py +++ b/profiler/msprof_analyze/tinker/profiler/gen_ascendcloud_structure.py @@ -21,7 +21,7 @@ import sys sys.path.append("./") -from tinker.utils.logger import logger +from tinker.utils.logger import LOGGER from tinker.profiler.gen_model_structure import DEL_PARAM_IN_GPT_ARGS from tinker.utils.utils import read_file, extract_line, extract_line_ignore_blank, extract_between, del_line, \ del_content, write_lines @@ -64,7 +64,7 @@ def get_input_files(args): read_file(pipeline_file), read_file(pretrain_file), read_file(training_file)) except RuntimeError as exception: - logger.error(f'check if files exist: {pipeline_file}, {pretrain_file}, {training_file}') + LOGGER.error(f'check if files exist: {pipeline_file}, {pretrain_file}, {training_file}') raise RuntimeError from exception return pipeline_file_content, pretrain_file_content, training_file_content diff --git a/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py b/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py index d4e80a4f155fb11dc9e5e4a9a90e43547327aa6f..6a804bbcc431f8021d4f689cf80ea3d71eaff8b2 100644 --- a/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py +++ b/profiler/msprof_analyze/tinker/profiler/gen_model_structure.py @@ -25,7 +25,7 @@ import sys import uuid sys.path.append("./") -from tinker.utils.logger import logger, init_log +from tinker.utils.logger import LOGGER, init_log from tinker.utils.utils import project_root, extract_and_format_model_size from tinker.utils.utils import read_file, extract_line, extract_between, del_line, write_lines @@ -133,7 +133,7 @@ def gen_model_structure_script(source_file: str, dest_file: str, args): if os.path.exists(dest_file) and not args.overwrite: raise RuntimeError(f'The file: {dest_file} already exists, if you want to overwrite, add param \'-o\'.') write_lines(final_res, dest_file) - logger.info(f'successfully write file to {dest_file}') + LOGGER.info(f'successfully write file to {dest_file}') def replace_export(match): @@ -174,7 +174,7 @@ def gen_model_structure_version2(pretrain_script, dest_file, args): if os.path.exists(dest_file) and not args.overwrite: raise RuntimeError(f'The file: {dest_file} already exists, if you want to overwrite, add param \'-o\'.') write_lines(new_script_content.splitlines(), dest_file) - logger.info(f'successfully write file to {dest_file}') + LOGGER.info(f'successfully write file to {dest_file}') return cmd_dict_paris @@ -266,7 +266,7 @@ def _hook_pretrain_script(pretrain_script): try: pretrain_script = read_file(pretrain_script) except (FileNotFoundError, RuntimeError): - logger.error(f'cannot find pretrain script: {pretrain_script}') + LOGGER.error(f'cannot find pretrain script: {pretrain_script}') raise root = project_root() @@ -282,7 +282,7 @@ def _hook_pretrain_script(pretrain_script): if result.returncode != 0: raise RuntimeError(f"Failed run with code: {result.returncode}, error message:\n{result.stderr}") except Exception as e: - logger.error(f"An error occurred while executing the command, message: {e}") + LOGGER.error(f"An error occurred while executing the command, message: {e}") raise # 删除临时存的文件 os.remove(copy_of_pretrain_file) diff --git a/profiler/msprof_analyze/tinker/profiler/localhost_profile_intra_node_p2p.sh b/profiler/msprof_analyze/tinker/profiler/localhost_profile_intra_node_p2p.sh index bf6d5b726ec4f3a38356c2f8dc507392b7ada8a8..7a1011e50a245f431ea48e3cfd07aa0055a8cf39 100644 --- a/profiler/msprof_analyze/tinker/profiler/localhost_profile_intra_node_p2p.sh +++ b/profiler/msprof_analyze/tinker/profiler/localhost_profile_intra_node_p2p.sh @@ -38,6 +38,7 @@ export HCCL_WHITELIST_DISABLE=1 export CUDA_DEVICE_MAX_CONNECTIONS=1 export NPU_ASD_ENABLE=0 export ASCEND_LAUNCH_BLOCKING=1 +export PYTHONPATH=$(dirname $(dirname "$PARENT_PATH")):$PYTHONPATH # set hccl timeout time in seconds 不能小于120 export HCCL_CONNECT_TIMEOUT=128 @@ -47,8 +48,6 @@ export HCCL_CONNECT_TIMEOUT=128 #MAPROF_OUTPATH=$1 DATE=$(date '+%m%d%H%M%S') PROFILING_PATH=$1 -SCRIPT_PATH=$(realpath "$0") -PARENT_PATH=$(dirname "$SCRIPT_PATH") #echo "-------------------current MAPROF_OUTPATH=$MAPROF_OUTPATH" echo "--------------------current PROFILING_PATH=$PROFILING_PATH" @@ -64,4 +63,4 @@ NNODES=1 \ GPUS_PER_NODE=2 \ NODE_RANK=$NODE_RANK \ FILE_NAME=$FILE_NAME \ -python3 ${PARENT_PATH}/p2p_band_profiler.py 2>&1 | tee $LOG_PATH/profiler_$DATE.log +python3 ./tinker/profiler/p2p_band_profiler.py 2>&1 | tee $LOG_PATH/profiler_$DATE.log \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/p2p_band_profiler.py b/profiler/msprof_analyze/tinker/profiler/p2p_band_profiler.py index 8c4cd17d3c36daf20aec7b55d1fbc5bdd65f77ed..1d8c4936e8853ca595e0c89147a6eacd469baff4 100644 --- a/profiler/msprof_analyze/tinker/profiler/p2p_band_profiler.py +++ b/profiler/msprof_analyze/tinker/profiler/p2p_band_profiler.py @@ -12,24 +12,113 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os + import csv +import logging +import os +import time + +import torch +import torch.distributed as dist +import torch.multiprocessing as multiproc +import torch_npu +from torch_npu.contrib import transfer_to_npu +from torch_npu.npu import amp + +from tinker.utils.logger import logger, init_log + +result_file_name = os.environ.get("FILE_NAME", "p2p_band.log") + + +def for_import(): + """防止IDE优化删除import过程中包含有效率逻辑但未被使用的import""" + torch_npu, amp, transfer_to_npu, multiproc + + +def run(local_rank, global_rank): + """ Simple collective communication. """ + init_log(None, logging.INFO) + global result_file_name + all_data_sizes = [] + all_bandwidths = [] + warmup_times = 20 + repeat_times = 50 + torch.cuda.set_device(local_rank) -def main(): - all_data_size_mb = [] for i in range(11): - data_size_mb = 2 ** i - all_data_size_mb.append(data_size_mb) - all_bandwidth_gb_per_second = [ - 3, 6, 9, 12, 15, 17, 18, 18, 18, 19, 19 - ] - result_file = os.environ.get("FILE_NAME", "p2p_intra_node.csv") - with open(result_file, "a+") as f: - f_csv = csv.writer(f) - f_csv.writerow(all_data_size_mb) - f_csv.writerow(all_bandwidth_gb_per_second) + data_size_in_mb = 2 ** i + all_data_sizes.append(data_size_in_mb) + data_size = data_size_in_mb * 1024 * 1024 // 2 + tensor = torch.ones(data_size, dtype=torch.float16).cuda() + + if global_rank == 0: + for i in range(warmup_times): + dist.send(tensor, dst=1 - global_rank) + time_list = [] + for i in range(repeat_times): + torch.cuda.synchronize() + start = time.time() + + dist.send(tensor, dst=1 - global_rank) + torch.cuda.synchronize() + end = time.time() + time_list.append((end - start) * 1000) + + elif global_rank == 1: + for i in range(warmup_times): + dist.recv(tensor, src=1 - global_rank) + time_list = [] + for i in range(repeat_times): + torch.cuda.synchronize() + start = time.time() + dist.recv(tensor, src=1 - global_rank) + torch.cuda.synchronize() + end = time.time() + time_list.append((end - start) * 1000) + + avg_time_result_in_ms = sum(time_list) / repeat_times + bandwidth_in_gb_per_second = (data_size_in_mb / 1024) / (avg_time_result_in_ms / 1000) + all_bandwidths.append(f"{bandwidth_in_gb_per_second:.2f}") + result_string = f'Rank {global_rank} | Time(averaged {repeat_times} times) = {avg_time_result_in_ms:.2f} ms, data_size = {data_size_in_mb:.2f} MB, bandwidth = {bandwidth_in_gb_per_second:.2f} GB/s' + if global_rank == 0: + logger.info(result_string) + if global_rank == 0: + with open(result_file_name, "a+") as f: + f_csv = csv.writer(f) + f_csv.writerow(all_data_sizes) + f_csv.writerow(all_bandwidths) + + +def init_process(local_rank, global_rank, world_size, fn, backend='nccl'): + """ Initialize the distributed environment. """ + # Call the init process + init_method = 'tcp://' + master_ip = os.getenv('MASTER_ADDR', 'localhost') + master_port = os.getenv('MASTER_PORT', '6000') + init_method += master_ip + ':' + master_port + + dist.init_process_group( + backend=backend, + world_size=world_size, rank=global_rank, + init_method=init_method) + + fn(local_rank, global_rank) if __name__ == "__main__": - main() + gpus_per_node = int(os.getenv('GPUS_PER_NODE', 1)) + num_nodes = int(os.getenv('NNODES', 1)) + node_rank = int(os.getenv('NODE_RANK', 0)) + multiproc.set_start_method("spawn", force=True) + world_size = gpus_per_node * num_nodes + + processes = [] + for local_rank in range(gpus_per_node): + global_rank = gpus_per_node * node_rank + local_rank + p = multiproc.Process(target=init_process, args=(local_rank, global_rank, world_size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/profile.sh b/profiler/msprof_analyze/tinker/profiler/profile.sh index e5b5f4828a2b4d59a7766da6962f893a6f40d74b..ed568aafbb400e4c397b6593d97f194852308b31 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile.sh +++ b/profiler/msprof_analyze/tinker/profiler/profile.sh @@ -16,19 +16,19 @@ # limitations under the License. if [[ $(pwd) == *"ma-user"* ]]; then - source /home/ma-user/Ascend/ascend-toolkit/set_env.sh + source /home/ma-user/Ascend/ascend-toolkit/set_env.sh fi -# 1. 命令行入参校验 若异常则提示检查“profile_space.py” +# 1. 命令行入参校验 若异常则提示检查`profile_space.py` if [ "$#" -lt 4 ]; then - echo "Error: Script profile.sh requires at least 4 arguments, but get $# arguments" - echo " Supposed arguments: model_name model_size TP SP EP=0 mbs_limit=65536 save_path=./profiled_data suffix=DateTimeStamp version=1.1" - echo " Please check TinkerScripter.run_config() in profile_space.py" - exit 1 + echo "Error: Script profile.sh requires at least 4 arguments, but get $# arguments" + echo " Supposed arguments: model_name model_size TP SP EP=0 mbs_limit=65536 save_path=./profiled_data suffix=DateTimeStamp version=1.1" + echo " Please check TinkerScripter.run_config() in profile_space.py" + exit 1 fi SUFFIX=-${8:-"$(date '+%y%m%d-%H%M%S')"} if [ $SUFFIX == "-" ]; then - SUFFIX="" + SUFFIX="" fi RUNTIME_PATH=${7:-"$(pwd)/profiled_data"} mbs_limit=${6:-65536} @@ -36,7 +36,14 @@ ep=${5:-0} export ML_VERSION=${9:-1.1} -# 2. 变量初始化, 其中系统变量的export相关逻辑在各model.sh中完成 +SCRIPT_DIR=$(dirname "$(realpath "$0")") +PROJECT_PATH=$(dirname $(dirname "$SCRIPT_DIR")) +export PYTHONPATH=$PROJECT_PATH:$PYTHONPATH + +# 临时适配路径,后续将优化路径的输入和对版本信息的使用 +export ML_PATH="${PROJECT_PATH}/modellink-ref/modellink-${ML_VERSION}" + +# 2. 变量初始化,其中系统变量的export相关逻辑在各model.sh中完成 find_free_port() { local port=7000 while netstat -an | grep -q "$port"; do @@ -52,18 +59,17 @@ NNODES=1 NODE_RANK=0 DISTRIBUTED_ARGS=" - --nnodes ${NNODES} \ - --node_rank ${NODE_RANK} \ - --master_addr ${MASTER_ADDR} \ - --master_port ${MASTER_PORT} \ + --nnodes ${NNODES} \ + --node_rank ${NODE_RANK} \ + --master_addr ${MASTER_ADDR} \ + --master_port ${MASTER_PORT} \ " -# 可调节的profiler超参, 目前看取3-10和10-40没影响 +# 可调节的profiler超参,目前看取3-10和10-40无影响 WARMUP_TIMES=3 REPEAT_TIMES=10 MAX_NUM_GPUS=8 -SCRIPT_DIR=$(dirname "$(realpath "$0")") # 3. 模型结构参数脚本,读取模型结构命令行参数 ascend_model_script="$SCRIPT_DIR/ascendcloud_model_${1}_${2}.sh" @@ -72,22 +78,22 @@ model_script="$SCRIPT_DIR/model_${1}_${2}.sh" # 检查脚本文件是否存在 if [ -f "$model_script" ]; then - effect_script=$model_script -elif [-f "$ascend_model_script" ]; then - effect_script=$ascend_model_script + effect_script=$model_script +elif [ -f "$ascend_model_script" ]; then + effect_script=$ascend_model_script else - echo "Error: Script '$model_script' or '$ascend_model_script' not found." - exit 1 + echo "Error: Script '$model_script' or '$ascend_model_script' not found." + exit 1 fi # 为不同环境生成存放词表和数据文件的路径 if [ -z "$ML_MODEL_PATH" ]; then - CURRENT_PATH=$(pwd) - if [[ $CURRENT_PATH == *"ma-user"* ]]; then - export ML_MODEL_PATH="/home/ma-user/work/modellink-resources" - else - export ML_MODEL_PATH="." - fi + CURRENT_PATH=$(pwd) + if [[ $CURRENT_PATH == *"ma-user"* ]]; then + export ML_MODEL_PATH="/home/ma-user/work/modellink-resources" + else + export ML_MODEL_PATH="." + fi fi # 待覆盖变量 @@ -103,59 +109,58 @@ MODEL_NAME=$1 MODEL_SIZE=$2 # 4. 数据落盘地址 -PROFILING_PATH="${RUNTIME_PATH}/profiled-data-${MODEL_NAME}-${MODEL_SIZE}${SUFFIX}" # 若目录不存在, 则会自动创建 +PROFILING_PATH="${RUNTIME_PATH}/profiled-data-${MODEL_NAME}-${MODEL_SIZE}${SUFFIX}" # 若目录不存在,则会自动创建 if [ "$#" -lt 8 ]; then - rm -rf $PROFILING_PATH # 未指定时,才删除重复拉起的目录,但目前没用 + rm -rf $PROFILING_PATH # 未指定时,才删除重复拉起的目录,但目前没用 fi mkdir -p ${PROFILING_PATH} PROF_ARGS=" - --prof-path ${PROFILING_PATH} \ - --prof-cache-file ${PROFILING_PATH}/${MODEL_NAME}_op_profile.pkl \ - --prof-model-name ${MODEL_NAME} \ - --prof-model-size ${MODEL_SIZE} \ - --prof-warmup-times ${WARMUP_TIMES} \ - --prof-repeat-times ${REPEAT_TIMES} \ + --prof-path ${PROFILING_PATH} \ + --prof-model-name ${MODEL_NAME} \ + --prof-model-size ${MODEL_SIZE} \ + --prof-warmup-times ${WARMUP_TIMES} \ + --prof-repeat-times ${REPEAT_TIMES} \ " torch_run() { - local tp=$1 - local sp=$2 - local ep=$3 - local mbs_limit=$4 - local dp=1 - if ((ep >= 1)); then - let dp=ep - fi - if ((tp * dp > MAX_NUM_GPUS || tp == 1 && sp == 1)); then - return 1 - fi - EXP_ID="tp${tp}_sp${sp}_ep${ep}" - echo "=================== working on ${EXP_ID} ========================" - let gpu=tp*dp - SUMMARIZE_ARGS=" - ${PROF_ARGS} - ${GPT_ARGS} - --tensor-model-parallel-size ${tp} - --pipeline-model-parallel-size 1 - --distributed-timeout-minutes 5 - " - if [ "${ep}" -ge 1 ]; then - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} ${MOE_ARGS} --expert-model-parallel-size ${ep}" - fi - if [ "${sp}" -eq 1 ]; then - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --sequence-parallel" - fi - # 可规避一部分mbs oom情况 - SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --prof-mbs-limit ${mbs_limit}" - echo [TIME] before profiling ${EXP_ID} : $(date '+%Y-%m-%d-%H-%M-%S') >> ${PROFILING_PATH}/profiling_${MODEL_NAME}.log - - torchrun ${DISTRIBUTED_ARGS} --nproc_per_node ${gpu} $SCRIPT_DIR/block_profiler.py \ - ${SUMMARIZE_ARGS} \ - 2>&1 | tee ${PROFILING_PATH}/profiling_${MODEL_NAME}_${EXP_ID}.log + local tp=$1 + local sp=$2 + local ep=$3 + local mbs_limit=$4 + local dp=1 + if ((ep >= 1)); then + let dp=ep + fi + if ((tp * dp > MAX_NUM_GPUS || tp == 1 && sp == 1)); then + return 1 + fi + EXP_ID="tp${tp}_sp${sp}_ep${ep}" + echo "================================ working on ${EXP_ID} ================================" + let gpu=tp*dp + SUMMARIZE_ARGS=" + ${PROF_ARGS} + ${GPT_ARGS} + --tensor-model-parallel-size ${tp} + --pipeline-model-parallel-size 1 + --distributed-timeout-minutes 5 + " + if [ "${ep}" -ge 1 ]; then + SUMMARIZE_ARGS="${SUMMARIZE_ARGS} ${MOE_ARGS} --expert-model-parallel-size ${ep}" + fi + if [ "${sp}" -eq 1 ]; then + SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --sequence-parallel" + fi + # 可规避一部分mbs oom情况 + SUMMARIZE_ARGS="${SUMMARIZE_ARGS} --prof-mbs-limit ${mbs_limit}" + echo [TIME] before profiling ${EXP_ID} : $(date '+%Y-%m-%d-%H-%M-%S') >> ${PROFILING_PATH}/profiling_${MODEL_NAME}.log + + torchrun ${DISTRIBUTED_ARGS} --nproc_per_node ${gpu} $SCRIPT_DIR/block_profiler.py \ + ${SUMMARIZE_ARGS} \ + 2>&1 | tee ${PROFILING_PATH}/profiling_${MODEL_NAME}_${EXP_ID}.log echo [TIME] after profiling ${EXP_ID} : $(date '+%Y-%m-%d-%H-%M-%S') >> ${PROFILING_PATH}/profiling_${MODEL_NAME}.log } -# 6. 拉起该次profiler任务: tp sp ep mbs_limit -torch_run $3 $4 $ep $mbs_limit +# 6. 拉起该次profiler任务: tp sp ep mbs_limit +torch_run $3 $4 $ep $mbs_limit \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/profile_space.py b/profiler/msprof_analyze/tinker/profiler/profile_space.py index a2e074fd914e0e58dd16edfe3416d39a9c8f9e79..68f17970c09c711545da0592b7fc61940a25aab8 100644 --- a/profiler/msprof_analyze/tinker/profiler/profile_space.py +++ b/profiler/msprof_analyze/tinker/profiler/profile_space.py @@ -29,6 +29,7 @@ import datetime from typing import List, Optional, Tuple from tinker.profiler import gen_model_structure +from profiler.msprof_analyze.tinker.model.adapter_utils import gen_block_adapter from tinker.utils.config import TINKER_DIR from tinker.utils.logger import init_log, logger from tinker.utils.utils import extract_and_format_model_size @@ -217,6 +218,23 @@ class TinkerScripter: def can_ep(self): return hasattr(self.model_args, 'num_experts') + @staticmethod + def post_process(mbs_limit, oom, process, torchrun_failed): + try: + stderr = process.communicate(timeout=100)[1] + except subprocess.TimeoutExpired: + process.kill() + stderr = None + if stderr: + logger.info(f"stderr: {stderr}") + if process.returncode: + if oom: + logger.info(f"profile内存溢出于{mbs_limit},将裁剪剩余并行策略探索空间") + elif torchrun_failed: + logger.warning(f"torchrun执行错误") + else: + logger.warning(f"脚本执行错误") + @staticmethod def is_valid_value(value, space): return space is None or value in space @@ -281,20 +299,7 @@ class TinkerScripter: torchrun_failed = True # 获取剩余的标准错误输出 - try: - stderr = process.communicate(timeout=100)[1] - except subprocess.TimeoutExpired: - process.kill() - stderr = None - if stderr: - logger.info(f"stderr: {stderr}") - if process.returncode: - if oom: - logger.info(f"profile内存溢出于{mbs_limit},将裁剪剩余并行策略探索空间") - elif torchrun_failed: - logger.warning(f"torchrun执行错误") - else: - logger.warning(f"脚本执行错误") + self.post_process(mbs_limit, oom, process, torchrun_failed) return mbs_limit @@ -366,6 +371,8 @@ def run(args: argparse.Namespace): # (待改进)规划客户使用时的suffix使用逻辑 pre_log(pre_logging_text) model_args = get_model_structure(args) + # 自动生成adapter + gen_block_adapter(args.version, hasattr(model_args, 'use_mcore_models') and model_args.use_mcore_models) profiler = TinkerScripter(model_name, model_size, suffix, args.save_path, args.version, model_args) # 生成参数空间 arg_space = profiler.get_arg_space() @@ -383,4 +390,4 @@ def run(args: argparse.Namespace): # 输出profile情况 logger.info(f'Profile Space Total Time: {time.time() - start_time:.2f}') logger.info(f'Profile Data Saved at {dir_path}') - return dir_path + return dir_path \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/profiler/wrap_pretrain.sh b/profiler/msprof_analyze/tinker/profiler/wrap_pretrain.sh index 03d086796fdb3a6f2cdba91cfdf7cf309f30fa3f..c87d438867e1c115b147c4a5c7aa99b9e31b77bf 100644 --- a/profiler/msprof_analyze/tinker/profiler/wrap_pretrain.sh +++ b/profiler/msprof_analyze/tinker/profiler/wrap_pretrain.sh @@ -3,39 +3,39 @@ # 初始化一个关联数组来存储导出的环境变量 declare -A exported_vars -# 重定义 ‘export' 命令以捕获环境变量 -export(){ - # 遍历所有传递给export的参数 - for var in "$@"; do - # 检查参数是否为 VAR=VALUE的形式 - if [[ "$var" == *=* ]]; then - var_name="${var%%=*}" - var_value="${var#*=}" - # 存储到关联数组 - exported_vars["$var_name"]="$var_value" - else - # 仅导出变量, 没有赋值 - var_name="$var" - var_value="${!var_name}" - exported_vars["$var_name"]="$var_value" - fi - # 执行实际的export命令 - builtin export "$var" - done +# 重定义 'export' 命令以捕获环境变量 +export() { + # 遍历所有传递给 export 的参数 + for var in "$@"; do + # 检查参数是否为 VAR=VALUE 形式 + if [[ "$var" == *=* ]]; then + var_name="${var%%=*}" + var_value="${var#*=}" + # 存储到关联数组 + exported_vars["$var_name"]="$var_value" + else + # 仅导出变量,没有赋值 + var_name="$var" + var_value="${!var_name}" + exported_vars["$var_name"]="$var_value" + fi + # 执行实际的export命令 + builtin export "$var" + done } # 重定义 'python' 命令以捕获其参数并阻止执行 -python(){ - echo "python_cmd_start" - echo "python $*" - echo "python_cmd_end" +python() { + echo "python_cmd_start" + echo "python $*" + echo "python_cmd_end" } # 重定义 'torchrun' 命令以捕获其参数并阻止执行 torchrun() { - echo "torchrun_cmd_start" - echo "torchrun $*" - echo "torchrun_cmd_end" + echo "torchrun_cmd_start" + echo "torchrun $*" + echo "torchrun_cmd_end" } # 执行原始脚本 @@ -44,6 +44,6 @@ source $1 # 输出捕获的环境变量 echo "export_start" for var in "${!exported_vars[@]}"; do - echo "export $var=${exported_vars[$var]}" + echo "export $var=${exported_vars[$var]}" done echo "export_end" \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/arguments.py b/profiler/msprof_analyze/tinker/search/arguments.py index a851781b6b6b0599cf0baa00da05ecfd56bf52a9..6d6b2b7a1891330c8863eb8434c5f3ed9126582e 100644 --- a/profiler/msprof_analyze/tinker/search/arguments.py +++ b/profiler/msprof_analyze/tinker/search/arguments.py @@ -23,58 +23,6 @@ from tinker.utils.logger import logger from tinker.utils.utils import project_root -model_infos = { - # model info 示例: model_size: (num_layers, encoder_seq_length, hidden_size, ffn_hidden_size, num_attention_heads, kv_channels, vocab_size, params_dtype) - "qwen15": { - "14b": [40], - "4b": [40], - "7b": [32], - "32b": [64], - "72b": [80], - }, - "baichuan2": {"13b": [40]}, - "gpt": { - "350M": (24, 2048, 1024, 1024 * 4, 16, 1024 // 16, 51200, "fp16"), - "1_3b": (24, 2048, 2048, 2048 * 4, 16, 2048 // 16, 51200, "fp16"), - "2_6b": (32, 2048, 2560, 2560 * 4, 32, 2560 // 32, 51200, "fp16"), - "6_7b": (32, 2048, 4096, 4096 * 4, 32, 4096 // 32, 51200, "fp16"), - "13b": (40, 2048, 5120, 5120 * 4, 40, 5120 // 40, 51200, "fp16"), - "65b": (80, 2048, 8192, 8192 * 4, 64, 8192 // 64, 51200, "fp16"), - "scale-layer": (1, 2048, 512, 512 * 4, 8, 512 // 8, 51200, "fp16") - }, - "llama2": { - "7b": (32, 2048, 4096, 11008, 32, 11008 // 32, 51200, "fp16"), - "13b": [40], - "34b": (48, 4096, 8192, 22016, 64, 22016 // 64, 51200, "fp16"), - "70b": [80], - }, - "llama3": {"8b": [32]}, - "aquila": { - "6b": (32, 2048, 4096, 11008, 32, 128, 51200, "fp16") - }, - "codellama": { - "34b": (48, 4096, 8192, 22016, 64, 22016 // 64, 51200, "fp16") - }, - "mixtral": { - "7x8b": (32, None, None, None, None, None, None, None) - }, - "bloom": { - "7b": (30, 2048, 4096, 4096 * 4, 32, 4096 // 32, 51200, "fp16") - }, - "chatglm3": { - "6b": (28, 8192, 4096, 13696, 32, 4096 // 32, 51200, "fp16") - }, - "t5": { - # 220M的情况 "220M": (12, SEQ_LEN, DECODER_SEQ_LEN, 768, 3072, 12, 64, 30592, "fp16"), - "770M": (24, 2048, 512, 1024, 4096, 16, 64, 30592, "fp16"), - "3b": (24, 2048, 512, 1024, 16384, 32, 128, 30592, "fp16"), - "6b": (24, 2048, 512, 1024, 32768, 64, 128, 30592, "fp16"), - "11b": (24, 2048, 512, 1024, 65536, 128, 128, 30592, "fp16"), - "22b": (48, 2048, 512, 1024, 65536, 128, 128, 30592, "fp16"), - }, -} - - def print_args(args): """Print arguments.""" logger.info('------------------------ arguments ------------------------') @@ -90,8 +38,6 @@ def print_args(args): def preprocess_args(args: argparse.Namespace): args.num_npus = args.num_npus_per_node * args.num_nodes - args.num_layers = model_infos.get(args.model_name, {}).get(args.model_size, [1])[0] - # 当前固定值 args.memory_main_params = 2 args.memory_optimizer = 4 @@ -100,11 +46,9 @@ def preprocess_args(args: argparse.Namespace): mission_id = ( f"{args.model_name}-{args.model_size}-gbs{args.global_batch_size}-{args.memory_limit}-{args.num_nodes}" f"nodes{args.num_npus_per_node}npus-{formatted_time}") - args.profiled_data_path = args.profiled_data_path.replace("\\", "/") - if '/' not in args.profiled_data_path: - # 文件夹路径入参不含'/'路径分隔符,则认为该文件夹在profiled_data中 - project_dir = project_root() - args.profiled_data_path = os.path.join(project_dir, 'profiled_data', args.profiled_data_path) + + if args.pretrain_script_path_search is not None: + args.pretrain_script_path = args.pretrain_script_path_search if args.mode != 'simulate': # 结果就落盘在 output_dir @@ -112,4 +56,7 @@ def preprocess_args(args: argparse.Namespace): args.config_save_path = os.path.join(args.log_path, 'configs') args.log_file = os.path.join(args.log_path, f'{mission_id}.log') - return args + # 使用 exist_ok=True 参数,这样如果目录已经存在,不会抛出 FileExistsError 异常 + os.makedirs(args.config_save_path, exist_ok=True) + + return args \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/cost_model.py b/profiler/msprof_analyze/tinker/search/cost_model.py index a839d2b3b616311cc1d986d4ab42b1192efb4251..516f35f44241e2c9ff55e3934423438822ca8be2 100644 --- a/profiler/msprof_analyze/tinker/search/cost_model.py +++ b/profiler/msprof_analyze/tinker/search/cost_model.py @@ -51,25 +51,12 @@ class FixedValueDict: class ProfiledData: def __init__(self): - self._fwd = defaultdict(dict) # type: ProfileDataType - self._bwd = defaultdict(dict) # type: ProfileDataType - self._in = defaultdict(dict) # type: ProfileDataType - self._out = defaultdict(dict) # type: ProfileDataType - self._w = defaultdict(dict) # type: ProfileDataType - self._act = defaultdict(dict) # type: ProfileDataType - self._reserved_fwd = defaultdict(dict) # type: ProfileDataType - self._reserved_bwd = defaultdict(dict) # type: ProfileDataType # 现在主要就用这玩意儿 self._block_data = defaultdict(dict) # type: Dict[FeaturesType, Dict[str, BlockCost]] - def __repr__(self): - return (f"ProfiledData(fwd={self._fwd}, bwd={self._bwd}, in={self._in}, out={self._out}, " - f"w={self._w}, act={self._act}, reserved_fwd={self._reserved_fwd}, reserved_bwd={self._reserved_bwd})") - @staticmethod def _get_data(datas: ProfileDataType, features: FeaturesType, block_name="") -> Union[Dict, FixedValueDict, float]: if features not in datas: - # 报错中断raise KeyError(f"feature {features} not in profiled data") logger.info(f"feature {features} not in profiled data, using 10000000.0") if block_name: return 20000000.0 @@ -81,15 +68,6 @@ class ProfiledData: def add_data(self, data: Tuple[float, ...], features: FeaturesType, block_name: str): self._block_data[features][block_name] = BlockCost(*data) - (self._fwd[features][block_name], - self._bwd[features][block_name], - self._in[features][block_name], - self._out[features][block_name], - self._w[features][block_name], - self._act[features][block_name], - self._reserved_fwd[features][block_name], - self._reserved_bwd[features][block_name]) = data - def add_data_from_csv(self, file_path: str, profile_args: ProfileArgs): with open(file_path, 'r') as f: src_data = csv.reader(f) @@ -153,17 +131,17 @@ class TinkerCostModel: 4. 若含头又含尾,则: max(blocks[0].bwd_reserved, blocks[-1].fwd_reserved) """ reserved_mem_costs = [] - first_stage_mem_reserved = blocks[0].data.bwd_reserved - last_stage_mem_reserved = blocks[-1].data.fwd_reserved + blocks[-1].data.act - other_stage_mem_reserved = blocks[1].data.bwd_reserved * 2 + first_stage_mem_reserved = blocks[0].max_reserved_mem + last_stage_mem_reserved = blocks[-1].max_reserved_mem + other_stage_mem_reserved = blocks[1].max_reserved_mem if pp == 1: - reserved_mem_costs.append(max(first_stage_mem_reserved, last_stage_mem_reserved)) + reserved_mem_costs.append(max(first_stage_mem_reserved, last_stage_mem_reserved, other_stage_mem_reserved)) return reserved_mem_costs - reserved_mem_costs.append(first_stage_mem_reserved) + reserved_mem_costs.append(max(first_stage_mem_reserved, other_stage_mem_reserved)) for _ in range(1, pp - 1): reserved_mem_costs.append(other_stage_mem_reserved) - reserved_mem_costs.append(last_stage_mem_reserved) + reserved_mem_costs.append(max(last_stage_mem_reserved, other_stage_mem_reserved)) return reserved_mem_costs @staticmethod @@ -275,6 +253,9 @@ class TinkerCostModel: return block_list def get_stage_status(self, current_blocks, num_npu_before, is_first_stage, is_last_stage): + """ + 此处计算与stage有关的time_cost + """ time_cost = 0 head_block = current_blocks[0] tail_block = current_blocks[-1] @@ -284,9 +265,9 @@ class TinkerCostModel: # 头尾通信开销 input_comm = 0 if is_first_stage else self.p2p_comm_time(head_block, num_npu_before, head=True) num_npu_before += head_block.num_npu_block - outpout_comm = 0 if is_last_stage else self.p2p_comm_time(tail_block, num_npu_before, tail=True) - time_cost += input_comm + outpout_comm - return num_npu_before, time_cost, input_comm, outpout_comm + output_comm = 0 if is_last_stage else self.p2p_comm_time(tail_block, num_npu_before, tail=True) + time_cost += input_comm + output_comm + return num_npu_before, time_cost, input_comm, output_comm def calculate_cost(self, param: TaskParam, pp_stage_block_intervals: list, detail=False): if detail: @@ -310,18 +291,12 @@ class TinkerCostModel: # 首block属性更改 head_block.is_first = True # 逐block计算性能 - # TODO 这里看看能否调用 get_stage_status,主要是函数过大 for block_idx in range(head_idx, tail_idx + 1): block = param.blocks[block_idx] block.num_fwd_act = TinkerCostModel.get_num_fwd_act(param.pp, p, micro_batch_num) mem_cost += block.block_mem() num_npu_before, time_cost, input_comm, output_comm = self.get_stage_status( param.blocks[head_idx: tail_idx + 1], num_npu_before, p == 0, p == param.pp - 1) - # 头尾通信开销 - input_comm = self.p2p_comm_time(head_block, num_npu_before, head=True) - num_npu_before += head_block.num_npu_block - output_comm = self.p2p_comm_time(tail_block, num_npu_before, tail=True) - time_cost += input_comm + output_comm # stage 重计算内存、内存碎片 recompute_mem = TinkerCostModel.calc_recompute_mem(param.blocks[head_idx:tail_idx + 1]) @@ -349,7 +324,7 @@ class TinkerCostModel: detail_info.print_time(bubble_time, micro_batch_num, time_cost) # 参数还原,避免后续影响 self._refresh_blocks(param) - return time_costs, mem_costs + return Metrics(time_costs, mem_costs, max(time_costs), max(mem_costs)) def _read_block_time(self, data_path: str): """基于profiler,生成searcher参数范围;或者直接基于每个tp sp mbs [ep],去衍化dp pp zero""" @@ -426,17 +401,17 @@ def run(args: argparse.Namespace): # 3.1 生成子图 pred_blocks = cost_model.init_blocks(pred_profiled_args, args.num_layers) # 3.2 校验所给策略有效性 - remainder = args.num_npus_per_node % (args.simu_pp * pred_profiled_args.tp) + remainder = args.num_npus % (args.simu_pp * pred_profiled_args.tp) if remainder != 0: raise ValueError( "incorrect num_npus={}, pp={}, tp={}, the former must be divided into the latter two.".format( - args.num_npus_per_node, args.simu_pp, pred_profiled_args.tp + args.num_npus, args.simu_pp, pred_profiled_args.tp )) # 3.3 计算DP LBS,打包CostModel变量并刷新block npu_used = pred_profiled_args.tp * args.simu_pp - if args.num_npus_per_node % npu_used: + if args.num_npus % npu_used: raise ValueError("num_npus cannot be evenly divided by the parallel strategy, check tp pp") - dp = args.num_npus_per_node // npu_used + dp = args.num_npus // npu_used local_batch_size = dp * pred_profiled_args.mbs if args.global_batch_size % local_batch_size: raise ValueError("incorrect gbs={}, dp={}, mbs={}, the former must be divided into the latter two.".format( @@ -462,9 +437,8 @@ def run(args: argparse.Namespace): task_param.profiled_args.ep, task_param.cost_model_args['zero'], task_param.cost_model_args['recompute'], task_param.profiled_args.is_moe, args.num_layer_list) - time_costs, mem_costs = cost_model.calculate_cost(task_param, intervals, args.detail) - metrics = Metrics(time_costs, mem_costs, max(time_costs), max(mem_costs)) + metrics = cost_model.calculate_cost(task_param, intervals, args.detail) print_result(args, (strategy, metrics)) end_time = time.time() - logger.info(f"[TOTAL TIME] {end_time - start_time} s.") + logger.info(f"[TOTAL TIME] {end_time - start_time} s.") \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/data.py b/profiler/msprof_analyze/tinker/search/data.py index 636120a006afce176c5f58ec7bbf17a685cf3b74..1d8445657269096e021a8fe01f4f2a79dc9587e3 100644 --- a/profiler/msprof_analyze/tinker/search/data.py +++ b/profiler/msprof_analyze/tinker/search/data.py @@ -64,16 +64,6 @@ class TaskParam: blocks: List[BlockArgs] -@dataclass(frozen=True) -class SearchIntervalResult: - """ - 给定一个task_param,应当产生一个 非均匀区间,以及对应的性能结果 - """ - interval_layer_list: list - time_costs: list - mem_costs: list - - @dataclass(frozen=True) class StageData: """ @@ -82,4 +72,4 @@ class StageData: num_npu_before: int stage_time_max_min: float num_layer_list: list - stage_mem_max: float + stage_mem_max: float \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py b/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py deleted file mode 100644 index aea8c4caf776923eef026f45f6840f40e9f0a8ac..0000000000000000000000000000000000000000 --- a/profiler/msprof_analyze/tinker/search/gen_modellink_plan.py +++ /dev/null @@ -1,306 +0,0 @@ -# Copyright (c) 2025, Huawei Technologies Co., Ltd. -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import itertools -import logging -import os.path -import sys -import time -import argparse -from collections import OrderedDict -from datetime import datetime, timezone, timedelta -from multiprocessing import Pool -from typing import Dict, Iterator, List - -sys.path.append("./") -from tinker.search.data import ParallelStrategy, Metrics, TaskParam, SearchIntervalResult, StageData -from tinker.utils.utils import read_file, load_infos, print_result -from tinker.utils.logger import logger, init_log -from tinker.search.cost_model import TinkerCostModel -from tinker.search.arguments import print_args, preprocess_args -from tinker.utils.convert_to_trainsh import convert_to_train_script -from tinker.utils.profile_args import ProfileArgs - -MAX_FLOAT = 1.0e9 -PRECISION_REDUNDANCY = 1.0e-7 - - -class Optimizer(abc.ABC): - - def __init__(self, cost_model: TinkerCostModel, user_args, ): - self.cost_model = cost_model - self.user_args = user_args - - @abc.abstractmethod - def search_parallel_strategies(self) -> Dict[ParallelStrategy, Metrics]: - pass - - @abc.abstractmethod - def process(self, strategy_to_metrics: Dict[ParallelStrategy, Metrics]): - pass - - def optimize(self): - strategies_to_metrics = self.search_parallel_strategies() - self.process(strategies_to_metrics) - - -class ModelLinkOptimizer(Optimizer): - - def __init__(self, cost_model: TinkerCostModel, user_args): - super().__init__(cost_model, user_args) - self.script = self.read_pretrain_file() - - @staticmethod - def _convert_to_num_layers(interval_layer_list): - num_layer_list = [interval[1] - interval[0] + 1 for interval in interval_layer_list] - num_layer_list[0] -= 1 - num_layer_list[-1] -= 2 - num_layers = ','.join(map(str, num_layer_list)) - return num_layers - - def search_parallel_strategies(self): - task_params = self._gen_task_params() - search_interval_results = self._parallel_task(task_params) - return self._convert_to_dict(task_params, search_interval_results) - - def read_pretrain_file(self): - # 若用户输入了 pretrain 的脚本路径,但该文件不存在,则报错 - if self.user_args.pretrain_script_path is not None: - # 读文件 - logger.info('find pretrain script, will write top strategies into it') - try: - script = read_file(self.user_args.pretrain_script_path) - except (FileNotFoundError, RuntimeError): - logger.error(f'an error occurred when read file \'{self.user_args.pretrain_script_path}\'') - raise - else: - script = '' - logger.info('the pretrain script path is empty in user input, will write top strategies into a blank file') - logger.info('result will store in %s', self.user_args.config_save_path) - return script - - def process(self, strategy_to_metrics: Dict[ParallelStrategy, Metrics]): - # step1 相同 time_cost, 不同 mem_cost, 取最小的mem_cost; - if not strategy_to_metrics: - logger.info("no feasible config, exit") - return - - sorted_by_time = sorted(strategy_to_metrics.items(), key=lambda item: (item[1].time_cost, item[1].mem_cost)) - result = OrderedDict() - # 相同 time_cost, 不同 mem_cost, 取最小的mem_cost - time_cost_set = set() - for strategy, metric in sorted_by_time: - if metric.time_cost not in time_cost_set: - result[strategy] = metric - - # step2 转换脚本,取top10 - for config_rank, config_key in enumerate(result.keys()): - config_result_pair = config_key, result.get(config_key) - print_result(self.user_args, config_result_pair) - # 1 相同 time_cost,不同 mem_cost,取最小的mem_cost;2 只存 top 10的pretrain脚本 - if config_rank + 1 <= 10: - convert_to_train_script(self.user_args, config_result_pair, config_rank + 1, self.script) - pass - - # step3 打印最优结果 - best_strategy = next(iter(result.items())) - - logger.info('Best: ') - # 其他打印 print_result(sorted(config_result_pairs, key=lambda x: x[3] if x[3] is not None else 1e9)[0]) - print_result(self.user_args, best_strategy) - - def _convert_to_dict(self, task_params, search_interval_results): - result = dict() - for task_param, search_interval_result in zip(task_params, search_interval_results): - if not search_interval_result: - continue - for single_result in search_interval_result: - num_layers = self._convert_to_num_layers(single_result.interval_layer_list) - strategy = ParallelStrategy(task_param.profiled_args.mbs, task_param.cost_model_args.get('dp'), - self.user_args.global_batch_size, task_param.pp, - task_param.profiled_args.tp, task_param.profiled_args.sp, - task_param.profiled_args.ep, task_param.cost_model_args['zero'], - task_param.cost_model_args['recompute'], task_param.profiled_args.is_moe, - num_layers) - metrics = Metrics(single_result.time_costs, single_result.mem_costs, max(single_result.time_costs), - max(single_result.mem_costs)) - result[strategy] = metrics - return result - - def _gen_task_params(self): - # task 是什么定义? task的目的是为了生成 num_intervals_list,共同组成最终的并行策略 - args = self.user_args - cost_model = self.cost_model - profiled_args_list = cost_model.get_profile_arg_list() # type: List[ProfileArgs] - # 3. 通过观测数据,搜索相关策略 - task_params = [] - # 这里的逻辑应该是除 num_layer_list 之外的所有参数 - for profiled_args in profiled_args_list: - profiled_args: ProfileArgs - # 计算当前profiled_args下的 pp zero dp 取值范围 - num_npus = args.num_npus - # todo 该类约束统一处理 - if num_npus % profiled_args.tp: - continue - # stage变量的搜索空间生成 - pp_space = TinkerCostModel.get_pp_range(num_npus, args.num_layers, profiled_args) # type: Iterator - zero_space = [0] if isinstance(profiled_args.ep, int) and profiled_args.ep > 1 else [0, 1] - recompute_space = [0, 1] # todo 支持逐block重计算,当前使用统一full recompute - - # 生成任务队列 - for pp, zero, recompute in itertools.product(pp_space, zero_space, recompute_space): - dp = num_npus // pp // profiled_args.tp - local_batch_size = dp * profiled_args.mbs - if args.global_batch_size % local_batch_size or dp == 1 and zero: - continue - cost_model_args = dict(dp=dp, zero=zero, recompute=recompute) - blocks = self.cost_model.init_blocks(profiled_args, self.user_args.num_layers) - for block in blocks: - block.update_cost_model_args(cost_model_args) - # 头尾处理不做recompute - for block in [blocks[0], blocks[-2], blocks[-1]]: - block.recompute = False - task_param = TaskParam(pp, cost_model_args, profiled_args, blocks) - task_params.append(task_param) - return task_params - - def _parallel_task(self, task_params: List[TaskParam]) -> List[ParallelStrategy]: - # 寻找最优的几种划分方式 - if self.user_args.cpus <= 1: - results = [self._memory_and_rounds_search(task_param) for task_param in task_params] - else: - with Pool(self.user_args.cpus) as pool: - results = pool.map(self._memory_and_rounds_search, task_params) - return results - - def _memory_and_rounds_search(self, task_param: TaskParam): - search_round = 5 - # 用于存储一些 memory_limit 较小但 time_cost 稍大的组合 - best_results = [] - next_memory_limit = self.user_args.memory_limit - # 计算保留内存 - reserved_mems = TinkerCostModel.calc_reserved_mem_costs(task_param.pp, task_param.blocks) - # 动态计算memory_limits - while search_round > 0: - memory_limits = [next_memory_limit - reserved_mem for reserved_mem in reserved_mems] - interval_layer_list = self._dynamic_programming(task_param, memory_limits) - if not interval_layer_list: - break - - time_costs, mem_costs = self.cost_model.calculate_cost(task_param, interval_layer_list) - search_interval_result = SearchIntervalResult(interval_layer_list, time_costs, mem_costs) - best_results.append(search_interval_result) - search_round -= 1 - next_memory_limit = max(search_interval_result.mem_costs) - # float 精度原因,输出的next_memory_limit可能会和输入相同,导致并行策略重复,此处减一微小值 - next_memory_limit -= PRECISION_REDUNDANCY - - return best_results - - def _dynamic_programming(self, param: TaskParam, memory_limits: List[float]): - """ - 指定 memory_limit 下的最优结果 - @param param: 入参 - @param memory_limits: 各stages的reserved内存开销,刻画内存碎片 - @return: 最优结果 - """ - num_all_blocks = len(param.blocks) - profile_args = param.blocks[0].profile_args - micro_batch_num = self.user_args.global_batch_size // param.cost_model_args['dp'] // profile_args.mbs - - # 头尾处理不流水线切分约束 - head_min_num = 1 - end_min_num = 2 - # dp[i][j] i:block_num,j: stage_idx - dp = [[StageData(num_npu_before=0, stage_time_max_min=float('inf'), num_layer_list=list(), stage_mem_max=0)] - * (param.pp + 1) for _ in range(num_all_blocks + 1)] - # 动规方程定义:前i个block划分为j个stage的所有方式中,最大time_cost的最小值 - dp[0][0] = StageData(num_npu_before=0, stage_time_max_min=0, num_layer_list=list(), stage_mem_max=0) - - for j in range(1, param.pp + 1): - - for i in range(1, num_all_blocks + 1): - if i <= head_min_num: - # 约束一 - continue - - for k in range(i - 1, -1, -1): - current_blocks = param.blocks[k: i] - # 约束二: - if j == param.pp and len(current_blocks) <= end_min_num: - continue - - # 使用j-1,提前固定乘数 - num_fwd_act = TinkerCostModel.get_num_fwd_act(param.pp, j - 1, micro_batch_num) - current_stage_mem = TinkerCostModel.get_stage_mem_cost(current_blocks, num_fwd_act) - # 使用stage对应内存上限判断当前是否可以提前退出 - if current_stage_mem >= memory_limits[j - 1]: - # 倒序,可以break - break - # 计算第j个stage的时间 - current_max_status = dp[k][j - 1] - num_npu_before, time_cost, _, _ = self.cost_model.get_stage_status( - current_blocks, current_max_status.num_npu_before, j == 1, j == param.pp - ) - # 当前最佳的切分方式 - current_max_time_cost = max(dp[k][j - 1].stage_time_max_min, time_cost) - current_max_mem_cost = max(dp[k][j - 1].stage_mem_max, current_stage_mem) - if current_max_time_cost < dp[i][j].stage_time_max_min: - idx_list = dp[k][j - 1].num_layer_list - current_list = idx_list.copy() - current_list.append(k) - dp[i][j] = StageData(num_npu_before=num_npu_before, stage_time_max_min=current_max_time_cost, - num_layer_list=current_list, stage_mem_max=current_max_mem_cost) - - best_result = dp[num_all_blocks][param.pp] - if not best_result.num_layer_list: - return None - # 根据分割点,计算划分区间 - points = best_result.num_layer_list - points.append(num_all_blocks) - dynamic_stage_intervals = list() - for i in range(param.pp): - start_idx = points[i] - end_idx = points[i + 1] - dynamic_stage_intervals.append((start_idx, end_idx - 1)) - return dynamic_stage_intervals - - -def initialize(args): - # 使用 exist_ok=True 参数,这样如果目录已经存在,不会抛出 FileExistsError 异常 - os.makedirs(args.config_save_path, exist_ok=True) - # 准备logger - formatted_time = datetime.now(timezone(timedelta(hours=8))).strftime('%Y-%m-%d-%H-%M-%S') - init_log(args.log_file, log_level=logging.INFO) - logger.info( - f"[LOG][SEARCH]({formatted_time}) start searching for {args.model_name}, {args.model_size}, {args.num_nodes}" - f" nodes * {args.num_npus_per_node} NPUs.") - - -def run(args: argparse.Namespace): - if args.mode != 'all' and args.mode != 'search': - return - start_time = time.time() - load_infos(args) - preprocess_args(args) - initialize(args) - print_args(args) - # 1. 实例化CostModel - cost_model = TinkerCostModel(args) - optimizer = ModelLinkOptimizer(cost_model=cost_model, user_args=args) - optimizer.optimize() - end_time = time.time() - logger.info(f"[TOTAL TIME] {end_time - start_time} s.") diff --git a/profiler/msprof_analyze/tinker/tinker_auto_parallel.py b/profiler/msprof_analyze/tinker/tinker_auto_parallel.py index 8adb3a26082606a37f5b4755ffa2726bb87fb156..34eca2b1157478bd39cc74759021297efb003430 100644 --- a/profiler/msprof_analyze/tinker/tinker_auto_parallel.py +++ b/profiler/msprof_analyze/tinker/tinker_auto_parallel.py @@ -36,4 +36,4 @@ def main(): if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/block_args.py b/profiler/msprof_analyze/tinker/utils/block_args.py index 892dcac341d9e737a60f525347a120a71bdee740..d8de3e3ec48c4a973b88cc4317484707e9e48967 100644 --- a/profiler/msprof_analyze/tinker/utils/block_args.py +++ b/profiler/msprof_analyze/tinker/utils/block_args.py @@ -16,7 +16,7 @@ from dataclasses import dataclass from typing import List, Dict -from tinker.utils.logger import logger +from tinker.utils.logger import LOGGER from tinker.utils.profile_args import ProfileArgs @@ -60,40 +60,40 @@ class DetailedInfo: def print_info(self): self._round_3() # 各个成分 - logger.debug('Time Cost'.center(60, '-')) - logger.debug(f'block forward time(us): {self.block_fwd}') - logger.debug(f'block backward time with recompute(us): {self.block_bwd}') - logger.debug(f'forward time = {self.fwd / 1000:.3f} ms') - logger.debug(f'backward time = {self.bwd / 1000:.3f} ms') + LOGGER.debug('Time Cost'.center(60, '-')) + LOGGER.debug(f'block forward time(us): {self.block_fwd}') + LOGGER.debug(f'block backward time with recompute(us): {self.block_bwd}') + LOGGER.debug(f'forward time = {self.fwd / 1000:.3f} ms') + LOGGER.debug(f'backward time = {self.bwd / 1000:.3f} ms') - logger.debug('Memory Cost'.center(60, '-')) + LOGGER.debug('Memory Cost'.center(60, '-')) model_optimizer_mem = self.weight + self.grad + self.weight_bf16 + self.full_precision_weight / self.dp_zero - logger.debug( + LOGGER.debug( f'model & optimizer({model_optimizer_mem:.3f})' f' = {self._v("weight")} + {self._v("grad")} + {self._v("weight_bf16")}' f' + {self._v("full_precision_weight")} / {self._v("dp_zero")}' ) - logger.debug(f'block weights({self.block_weight})') - logger.debug(f'block activations({self.block_act})') - logger.debug(f'first time block activations({self.first_time_block_act})') + LOGGER.debug(f'block weights({self.block_weight})') + LOGGER.debug(f'block activations({self.block_act})') + LOGGER.debug(f'first time block activations({self.first_time_block_act})') def print_time(self, bubble_time, micro_batch_num, time_cost): unit_time = (self.fwd + self.bwd + self.input_comm + self.output_comm) / 1000 bubble_time = bubble_time / 1000 - unit_time - logger.debug(f'Unit Time({unit_time:.3f} ms)' + LOGGER.debug(f'Unit Time({unit_time:.3f} ms)' f' = {self._v("fwd")} + {self._v("bwd")} + {self._v("input_comm")} + {self._v("output_comm")}') - logger.debug(f'Time({time_cost / 1000:.3f})' + LOGGER.debug(f'Time({time_cost / 1000:.3f})' f' = bubble({bubble_time:.3f}) + mbn({micro_batch_num}) * unit_time({unit_time:.3f})') def print_mem_calc(self, mem_cost): self._round_3() # pipeline_fwd_act计算 - logger.debug( + LOGGER.debug( f'{self._v("pipeline_fwd_act")} = ' f'{self._v("num_fwd_act")}' f' * [{self._v("inputs")} + {self._v("activation")}]' ) - logger.debug( + LOGGER.debug( f'Memory({mem_cost:.3f})' f' = {self._v("weight")} + {self._v("grad")} + {self._v("weight_bf16")}' f' + [{self._v("full_precision_weight")} + {self._v("optimizer_state")}]' @@ -140,6 +140,10 @@ class BlockArgs: args.bf16, args.fp16 = False, True self.bf16 = args.bf16 + @property + def max_reserved_mem(self): + return max(self.data.fwd_reserved, self.data.bwd_reserved) + @property def num_npu_block(self): """返回这个block涉及的NPU个数,通常一个stage中的block返回值都相等,所以调一个block的值就行""" diff --git a/profiler/msprof_analyze/tinker/utils/config.py b/profiler/msprof_analyze/tinker/utils/config.py index f62cbfae3153aadea86bcb65adc7fdf2fe7f053a..3946302f6a6716baaa769c5c498431560bb8de3f 100644 --- a/profiler/msprof_analyze/tinker/utils/config.py +++ b/profiler/msprof_analyze/tinker/utils/config.py @@ -23,7 +23,7 @@ from tinker.utils.utils import extract_arg_value_from_json, extract_arg_names_fr TINKER_DIR = os.path.join(project_root(), 'tinker') CONFIG_PATH = os.path.join(TINKER_DIR, 'parameter_config.json') -test_free_args = ['prof_tp', 'prof_sp'] +test_free_args = ['prof_tp', 'prof_sp', 'pretrain_script_path_search'] def add_args(parser: argparse.ArgumentParser): @@ -45,9 +45,11 @@ def add_search_args(parser: argparse.ArgumentParser): search_group = parser.add_argument_group(title='search group') search_group.add_argument('-cpus', '--cpus', type=int, help='number of cpu, search process will be faster if larger') - search_group.add_argument('-mem', '--memory_limit', type=int, default=28000, help='memory limit') + search_group.add_argument('-mem', '--memory_limit', type=int, help='memory limit') search_group.add_argument('-output', '--output_dir', type=str, default=None, help='path to save results for optimizer-search, log file etc.') + search_group.add_argument('-shs', '--pretrain_script_path_search', type=str, + help='path to pretrain shell script need to be optimized (defaults to profile phase\'s)') def add_simulate_args(parser: argparse.ArgumentParser): @@ -69,7 +71,7 @@ def add_simulate_args(parser: argparse.ArgumentParser): simulate_group.add_argument('-mbs', '--micro_batch_size', type=int, help='micro batch size') simulate_group.add_argument('--num_layer_list', type=str, help='a list of number of layers, seperated by comma; e.g., 4,4,4,4, required') - simulate_group.add_argument('--recompute', type=int, help='', choices=[0, 1]) + simulate_group.add_argument('--recompute', type=int, help='enable full recompute', choices=[0, 1]) simulate_group.add_argument('-d', '--detail', action='store_true', help='show detailed memory construct') @@ -86,7 +88,7 @@ def add_profile_args(parser: argparse.ArgumentParser): profile_group.add_argument('-sp', '--prof_sp', type=str, help='specify the SP-value for profiling, default for all SP') profile_group.add_argument('--max_mbs', type=int, help='specify the max mbs for profiling, default: 65536') - profile_group.add_argument('-p', '--save_path', type=str, + profile_group.add_argument('-p', '--save_path', type=str, help='directory to save profied data, default:`./profiled_data`') profile_group.add_argument('-i', '--task_id', type=str, help='specify suffix of profiled data dir') profile_group.add_argument('--max_npu', type=int, help='specify the max npu-nums, default: 8') @@ -95,7 +97,7 @@ def add_profile_args(parser: argparse.ArgumentParser): def parse_args() -> argparse.Namespace: """接收命令行参数""" # 创建ArgumentParser 对象 - description = ('parse args for tinker auto parallel') + description = 'parse args for tinker auto parallel' parser = argparse.ArgumentParser(description=description) # 定义位置参数 @@ -111,6 +113,15 @@ def parse_args() -> argparse.Namespace: return args +def process_path(args): + if args.mode == 'search': + args.profiled_data_path = args.profiled_data_path.replace("\\", "/") + if '/' not in args.profiled_data_path: + # 文件夹路径入参不含'/'路径分隔符,则认为该文件夹在profiled_data中 + project_dir = project_root() + args.profiled_data_path = os.path.join(project_dir, 'profiled_data', args.profiled_data_path) + + def check_args(args: argparse.Namespace) -> argparse.Namespace: """参数校验""" @@ -127,18 +138,18 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: # 检验参数是否为None def check_arg_none(arg: str): """检查参数是否为None, 并根据配置文件设置默认值或记录错误""" - dafault_value = extract_arg_value_from_json(args.mode, arg, args.config_path) + default_value = extract_arg_value_from_json(args.mode, arg, args.config_path) if args.mode == 'all': test_free_args.append('profiled_data_path') if args.mode != 'profile': test_free_args.append('pretrain_script_path') - if getattr(args, arg) is None and dafault_value is None and \ + if getattr(args, arg) is None and default_value is None and \ arg not in test_free_args: # 既未指定参数, 也未在配置文件中设置有效值 error_args.append(arg) elif getattr(args, arg) is None: # 未指定参数, 采用配置文件json中的默认值 - setattr(args, arg, dafault_value) + setattr(args, arg, default_value) # 检验路径参数是否有效 def check_path_valid(mode: str): @@ -163,6 +174,7 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: error_args = [] arg_list = extract_arg_names_from_json(args.mode, args.config_path) check_args_none(arg_list) + process_path(args) check_path_valid(args.mode) - return args + return args \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/constant.py b/profiler/msprof_analyze/tinker/utils/constant.py new file mode 100644 index 0000000000000000000000000000000000000000..af273d59e1d0fe1aae76a2983e220513bc4ef0ea --- /dev/null +++ b/profiler/msprof_analyze/tinker/utils/constant.py @@ -0,0 +1,32 @@ +import enum +from typing import Dict + + +class Version(enum.Enum): + # todo 待刷成1.0.RCx + MindSpeed_LLM_1_0_rc1 = "1.0" + MindSpeed_LLM_1_0_rc2 = "1.1" + MindSpeed_LLM_1_0_rc3 = "1.2" + + def __str__(self): + return self.value + + +VERSION_ALIASES: Dict[str, Version] = { + # 映射标准化版本 + "1.0": Version.MindSpeed_LLM_1_0_rc1, + "1.0.RC1": Version.MindSpeed_LLM_1_0_rc1, + "1.1": Version.MindSpeed_LLM_1_0_rc2, + "1.0.RC2": Version.MindSpeed_LLM_1_0_rc2, + "1.2": Version.MindSpeed_LLM_1_0_rc3, + "1.0.RC3": Version.MindSpeed_LLM_1_0_rc3, +} + + +def version_parse(version_str: str) -> Version: + normalized_str = version_str.strip().upper() + if normalized_str.startswith('V'): + normalized_str = normalized_str[1:] + if normalized_str not in VERSION_ALIASES: + raise ValueError(f"Unrecognized version: {version_str}, supported versions: {VERSION_ALIASES.keys()}") + return VERSION_ALIASES[normalized_str] \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py b/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py index 3f1c84ad4b1e2d08ab1bf46e0f121985ac366922..1d646e2882201c0faa187a86a3437e9818e8b7c3 100644 --- a/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py +++ b/profiler/msprof_analyze/tinker/utils/convert_to_trainsh.py @@ -15,75 +15,13 @@ # 定义原始脚本内容 from collections import namedtuple +from typing import Tuple -from tinker.utils.logger import logger -from tinker.utils.utils import extract_between, del_line, read_file, write_lines -from tinker.utils.script_template import chatglm3_script, codellama_scripts, qwen15_7b_script, qwen15_32b_script, modelarts_multinodes_scripts - -modelnamesize_to_scripts = { - "chatglm3-6B": chatglm3_script, - "chatglm3-6b": chatglm3_script, - "codellama-34B": codellama_scripts, - "codellama-34b": codellama_scripts, - "qwen15-7b": qwen15_7b_script, - "qwen15-7B": qwen15_7b_script, - "qwen15-32b": qwen15_32b_script, - "qwen15-32B": qwen15_32b_script -} - - -def can_convert_to_trainsh(args): - return f'{args.model_name}-{args.model_size}' in modelnamesize_to_scripts - - -def convert_batch_plans_to_trainshs(args, config: tuple, trace_flag: bool, config_rank: int): - global modelarts_multinodes_scripts - - pa, cma, pp, tc, mc, num_layer_list, _, _ = config - split = ",".join(map(str, num_layer_list)) - info_text = f'time{tc / 1000:.3f}_mem{mc:.2f}' - tp = pa.tp - mbs = pa.mbs - gbs = args.global_batch_size - seq = args.seq_length - sp = pa.sp - zero1 = cma['zero'] - rc = cma['recompute'] - - model_name_model_size = "{}-{}".format(args.model_name, args.model_size) - original_script = modelnamesize_to_scripts[model_name_model_size] - - if args.model_name == "codellama" or model_name_model_size in ["qwen15-32b", "qwen15-32B"]: - pass - else: - modelarts_multinodes_scripts = "\n".join( - [" " + line for line in modelarts_multinodes_scripts.splitlines()]) - - modified_script = original_script.format(modelarts_multinodes_scripts, tp, pp, mbs, gbs, seq, split, - seq, tp, pp, sp, zero1, mbs, gbs, split.replace(",", "_"), rc, info_text, - seq, tp, pp, sp, zero1, mbs, gbs, split.replace(",", "_"), rc, info_text) - - if sp: - modified_script = modified_script.replace( - "--use-flash-attn \\", "--use-flash-attn \\\n --sequence-parallel \\") - if zero1: - modified_script = modified_script.replace( - "--use-flash-attn \\", "--use-flash-attn \\\n --use-distributed-optimizer \\") - if rc: - modified_script = modified_script.replace("$OUTPUT_ARGS \\", "$OUTPUT_ARGS \\\n $RECfull_ARGS \\") - if trace_flag: - modified_script = modified_script.replace("$OUTPUT_ARGS \\", "$OUTPUT_ARGS \\\n $PROFILE_ARGS \\") - - # 将修改后的内容写入一个新的.sh文件 - trainsh_path = "{}/{}-{}-rank{}_seq{}_tp{}pp{}_sp{}zero1{}_mbs{}gbs{}_{}_rc{}_{}.sh".format( - args.config_save_path, args.model_name, args.model_size, config_rank, - seq, tp, pp, sp, zero1, mbs, gbs, split.replace(",", "_"), rc, info_text, - ) - with open(trainsh_path, "w") as file: - file.write(modified_script) - - -def convert_to_train_script(args, config: tuple, config_rank: int, pretrain_script): +from tinker.search.data import Metrics, ParallelStrategy +from tinker.utils.utils import extract_between, del_line, write_lines + + +def convert_to_train_script(args, config: Tuple[ParallelStrategy, Metrics], config_rank: int, pretrain_script): """ 将tinker并行策略嵌入用户预训练脚本,若没有,则仅生成一个 :param args: 用户参数 @@ -97,10 +35,10 @@ def convert_to_train_script(args, config: tuple, config_rank: int, pretrain_scri # 格式化输出文件名 info_text = f'time{metric.time_cost / 1000:.3f}_mem{metric.mem_cost:.2f}' split_params = strategy.num_layers.replace(',', '_') - trainsh_path = (f"{args.config_save_path}/{args.model_name}-{args.model_size}-rank{config_rank}_seq{args.seq_length}_tp" - f"{strategy.tp}pp{strategy.pp}_sp{strategy.sp}zero1{strategy.zero}_mbs" - f"{strategy.mbs}gbs{strategy.gbs}_{split_params}_rc{strategy.rc}_{info_text}" - f".sh") + trainsh_path = (f"{args.config_save_path}/{args.model_name}-{args.model_size}-rank{config_rank}" + f"_seq{args.seq_length}_tp{strategy.tp}_pp{strategy.pp}_sp{strategy.sp}" + f"_distopt{strategy.zero}_mbs{strategy.mbs}_gbs{strategy.gbs}_L{split_params}_rc{strategy.rc}" + f"_{info_text}.sh") script_content = write_strategy_to_file(pretrain_script, strategy) @@ -123,11 +61,12 @@ def write_strategy_to_file(script: str, strategy_param): '--num-layer-list', '--context-parallel-size ', '--context-parallel-algo ', '--ulysses-degree-in-cp ', '--cp-attention-mask-type ', '--use-cp-send-recv-overlap ', '--kv-head-repeat-before-uly-alltoall ', '--num-layers-per-virtual-pipeline-stage ', - '--overlap-param-gather '] + '--overlap-grad-reduce ', '--overlap-param-gather '] params_need_to_append = [f'--tensor-model-parallel-size {strategy_param.tp} \\', f'--micro-batch-size {strategy_param.mbs} \\', f'--global-batch-size {strategy_param.gbs} \\', + f'--overlap-grad-reduce \\', f'--pipeline-model-parallel-size {strategy_param.pp} \\'] if strategy_param.pp > 1: @@ -184,19 +123,4 @@ def write_strategy_to_file(script: str, strategy_param): insert_idx = hit_key_word_idx + num_skip_line tinker_args_in_cmd = ''.join([' ${', tinker_search_args_str, '} \\']) res_lines.insert(insert_idx, tinker_args_in_cmd) - return res_lines - - -def _pre_process(args, config: tuple): - AllParams = namedtuple('AllParams', ['split', 'tp', 'mbs', 'gbs', 'seq', 'sp', 'zero1', 'rc', 'pp']) - pa, cma, pp, tc, mc, num_layer_list, _, _ = config - split = ",".join(map(str, num_layer_list)) - tp = pa.tp - mbs = pa.mbs - gbs = args.global_batch_size - seq = args.seq_length - sp = pa.sp - zero1 = cma['zero'] - rc = cma['recompute'] - params = AllParams(split, tp, mbs, gbs, seq, sp, zero1, rc, pp) - return params + return res_lines \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/logger.py b/profiler/msprof_analyze/tinker/utils/logger.py index e0f3de5cae3f741146a67c72a387a69da2862404..96ed88a337450c143b500f1d1e7dce56a7326029 100644 --- a/profiler/msprof_analyze/tinker/utils/logger.py +++ b/profiler/msprof_analyze/tinker/utils/logger.py @@ -20,6 +20,7 @@ import logging.config from datetime import datetime, timezone, timedelta from enum import Enum +from typing import Optional class CustomFormatter(logging.Formatter): @@ -123,7 +124,7 @@ def get_default_config(): return log_config -def init_log(log_path: str, log_level): +def init_log(log_path: Optional[str], log_level): """ 此方法用于初始化&更新默认的log :param log_path: 日志存的地址 @@ -192,4 +193,4 @@ def init_profile_log(log_level): # 删除输出文件 del log_config['handlers']['logfile'] - logging.config.dictConfig(log_config) + logging.config.dictConfig(log_config) \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/profile_args.py b/profiler/msprof_analyze/tinker/utils/profile_args.py index b477db167cc605b01168c46d07c3bb4c0cf7e19d..2b51a63c39ef81c6be6f8dd90f9aa248b936c794 100644 --- a/profiler/msprof_analyze/tinker/utils/profile_args.py +++ b/profiler/msprof_analyze/tinker/utils/profile_args.py @@ -48,18 +48,18 @@ class ProfileArgs(ScriptArgs): @classmethod def new_from_file_name(cls, file_name: str) -> 'ProfileArgs': - """从文件名中还原出profile args """ - parser_data = {key: int(value) for key, value in re.findall(r"([a-zA-Z]+)(\d+)", file_name)} + """从文件名中还原出profile args""" + parsed_data = {key: int(value) for key, value in re.findall(r"([a-zA-Z]+)(\d+)", file_name)} - # 获取所有数据类的属性名, 并使用默认值 + # 获取所有数据类的属性名,并使用默认值 default_values = {field.name: field.default for field in fields(cls)} - # 更新默认值为字符串中的值, 忽略不存在的属性 - for key, value in parser_data.items(): + # 更新默认值为字符串中的值,忽略不存在的属性 + for key, value in parsed_data.items(): if key in default_values: default_values[key] = value - # 使用解析后的值来创建数据实例 + # 使用解析后的值来创建数据类实例 return cls(**default_values) def update_mbs(self, mbs): @@ -69,7 +69,6 @@ class ProfileArgs(ScriptArgs): return ProfileArgs(**new_dict) def _text(self, pre_split="", post_split=""): - # todo ep什么时候加 什么时候不加 exclude_info = {"model"} text = "" @@ -77,4 +76,4 @@ class ProfileArgs(ScriptArgs): if k in exclude_info: continue text += f'{pre_split}{k}{post_split}{v}' - return text + return text \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/utils/utils.py b/profiler/msprof_analyze/tinker/utils/utils.py index 6ab8584e358a48892974d3769091f3767e7d43a6..d7566ce4f0f04aa24142c389a6e8cdf4bdc20d45 100644 --- a/profiler/msprof_analyze/tinker/utils/utils.py +++ b/profiler/msprof_analyze/tinker/utils/utils.py @@ -17,8 +17,10 @@ import glob import json import os import re +from pathlib import Path from typing import List +from tinker.utils.constant import Version, version_parse from tinker.utils.logger import logger from tinker.version import optimizer_version @@ -41,7 +43,7 @@ def read_file(file_path: str): :param file_path 文件路径 Returns: 字符串 """ - with open(file_path, 'r') as file: + with open(file_path, 'r', encoding='utf-8') as file: return file.read() @@ -202,11 +204,14 @@ def load_infos(args): data = json.load(file) if 'version_profiler' not in data: args.version_profiler = data['version'] + args.version_framework = Version.MindSpeed_LLM_1_0_rc3 else: args.version_profiler = data['version_profiler'] + args.version_framework = version_parse(data['version']) args.model_name = data.get('model_name') args.model_size = data.get('model_size') - args.pretrain_script_path = data.get('pretrain_script_path') + if args.pretrain_script_path_search is None: + args.pretrain_script_path = data.get('pretrain_script_path') args.version_optimizer = optimizer_version() @@ -241,11 +246,62 @@ def byte_to_mb(x): return x / 1024.0 / 1024.0 +def find_keywords_line_idx(source_code: str, key_word: str): + """ + 提取 source_code 中 key_word 所在行号的列表 + :param source_code 用户输入的模型尺寸,待统一化 + :param key_word 用户输入的模型尺寸,待统一化 + Returns: line 索引列表 + """ + lines = source_code.splitlines() + res = [] + # 遍历每一行,查找关键字 + for line_idx, line in enumerate(lines): + if key_word in line: + res.append(line_idx) + if not res: + raise RuntimeError(f'Cannot find key word: {key_word} in source code') + return res + + +def get_lines(module_code: str, start_idx: int, end_idx: int): + """ + 获取 module_code 中指定起止位置代码 + :param module_code: 给定代码段 + :param start_idx: 给起始点 + :param end_idx: 给截止点 + :return: 区间代码段 + """ + # 提取module_code中第start_idx+1行 到 end_idx+1行的内容(左闭右开) + lines = module_code.splitlines() + + # 截取第 i 行到第 j 行的内容,注意切片的结束索引是 j + 1 + selected_lines = lines[start_idx:end_idx] # 假设传入的行号是从 1 开始的 + + # 将截取的行重新连接成一个字符串,使用换行符分隔 + return '\n'.join(selected_lines) + + +def path_to_package(file_system_path): + """ + 将路径形式转为包形式 + :param file_system_path: 给定路径 + :return: 包形式字符串 + """ + # 创建 Path 对象 + path = Path(file_system_path) + # 使用 parts 属性获取路径的各个部分 + parts = path.parts + # 使用 str.join 方法将部分拼接成包路径 + package_path = '.'.join(parts) + return package_path + + def extract_arg_value_from_json(mode: str, arg_name: str, json_path: str): """提取配置文件parameter_config.json中的参数值""" with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) - + if mode == 'all': return {**data.get('profile'), **data.get('search')}.get(arg_name) return data.get(mode).get(arg_name) @@ -253,26 +309,17 @@ def extract_arg_value_from_json(mode: str, arg_name: str, json_path: str): def extract_arg_names_from_json(mode: str, json_path: str) -> List[str]: """提取配置文件parameter_config.json中的参数列表""" - + def extract_fields(obj: dict, res: List[str]): - if mode == 'profile': - res.extend(iterate_args('profile', obj)) - elif mode == 'search': - res.extend(iterate_args('search', obj)) - elif mode == 'simulate': - res.extend(iterate_args('simulate', obj)) + if mode == 'all': + res.extend(get_arg_names('profile', obj)) + res.extend(get_arg_names('search', obj)) else: - res.extend(iterate_args('profile', obj)) - res.extend(iterate_args('search', obj)) - res = list(set(res)) - - def iterate_args(mode: str, obj: dict) -> List[str]: - temp_args = [] - for key, value in obj.items(): - if key == mode: - temp_args = list(value.keys()) - break - return temp_args + res.extend(get_arg_names(mode, obj)) + + def get_arg_names(mode: str, obj: dict) -> List[str]: + args_dict = obj.get(mode, {}) + return list(args_dict.keys()) with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) @@ -320,4 +367,4 @@ def check_files_in_dir(path: str): """校验目录下存在文件""" if os.path.isdir(path) and len(os.listdir(path)) == 0: logger.error(f'No files in {path}') - raise Exception + raise Exception \ No newline at end of file diff --git a/profiler/msprof_analyze/tinker/version.py b/profiler/msprof_analyze/tinker/version.py index 25a4f91a2974183737997ea3cd5a990036259b3c..6ae90438c24966ca1b64ac7034ad6bc1e7bd6bb3 100644 --- a/profiler/msprof_analyze/tinker/version.py +++ b/profiler/msprof_analyze/tinker/version.py @@ -20,7 +20,7 @@ import os # MAJOR和MAJOR编号目前暂时跟随ModelLink, 如1.2指支持到1.0.RC2, PATCH版本随时更新 PROFILER_VERSION_MAJOR = 1 PROFILER_VERSION_MINOR = 3 -PROFILER_VERSION_PATCH = '1b' +PROFILER_VERSION_PATCH = 3 OPTIMIZER_VERSION_MAJOR = 1 OPTIMIZER_VERSION_MINOR = 3