diff --git a/docs/pytorch/features/high_availability.md b/docs/pytorch/features/high_availability.md new file mode 100644 index 0000000000000000000000000000000000000000..6a521494abc1e772465b2c7a208822f15cc15b29 --- /dev/null +++ b/docs/pytorch/features/high_availability.md @@ -0,0 +1,64 @@ +# 昇腾高可用性 + +## 使用场景 + +分布式优化器的思想是通过将优化器状态均匀地分布在数据并行组中来节省内存。基于该思想,MindIO设计了将数据并行组切分成两个副本数据并行组的方案,副本优化器将优化器状态均匀分布在副本数据并行组,实现优化器状态均有备份。由于本特性对片上内存占用会有一定增加,推荐千卡及以上的大规模集群使用本特性,减少故障引起的机时损失。结合华为自研的高可用框架,可实现以下功能: + +### TTP临终遗言功能 + +在训练过程中发生故障后,校验优化器中间状态数据的完整性和一致性,生成一次临终Checkpoint数据,恢复训练时能够通过该CheckPoint恢复到故障前一刻的状态,减少故障造成的训练迭代损失。 + +### UCE Step级重计算功能 + +昇腾芯片支持NPU卡内存发生UCE故障(内存不可修复)的实时检测,检测到UCE故障后,基于优化器状态副本机制并完成故障卡的在线修复并继续训练,减少训练损失。 + +### 原理说明 + +megatron原生的分布式优化器数据流及工作原理如下图: + +输入图片说明 + +副本优化器通过设计优化器参数均匀分布在副本DP组,完成优化器状态的备份,从而为TTP/UCE功能提供机制支持: + +输入图片说明 + +副本优化器相比分布式优化器会有内存占用增加,相对占用如下: + +| | Non-distributed optim | Distributed optim | Replica optim | +|----------------------------------|-----------------------|-------------------|---------------| +| fp16/bf16 param, fp16/bf16 grads | 20 | 4 + 16/d | 4 + 32/d | +| fp16/bf16 param, fp32 grads | 18 | 6 + 12/d | 6 + 24/d | + +## 使用说明 + +### 环境准备 + +MindIO的功能以whl包的形式提供 + +mindio_ttp下载地址:[MindIO TTP 下载软件包-昇腾社区](https://www.hiascend.com/document/detail/zh/mindx-dl/600/clusterscheduling/ref/mindioacp/mindioacp009.html) + +### 启动脚本中添加启动参数 + +`--enable-high-availability` # 使能开启高可用功能的总开关,并使能TTP临终遗言功能,保存checkpoint时要求全局至少存在一份完整的优化器数据; + +`--enable-hbmfault-repair` # 使能进行片上内存故障,Step级重计算功能的开关;本功能将在线进行worker级修复,修复时要求全局至少存在一个故障卡的副本卡。 + +`--enable-worker-reboot` # 使能空中加油功能,配合支持相关功能的 MindX DL 组件共同使能后,在发生一般性故障时,进行进程级重启修复,继续训练。本功能会将故障卡所在节点进行重启,修复时要求未故障节点中至少存在一份完整的优化器数据。 + +`--distributed-optimizer-no-replica` # 不使用副本优化器而使用CKPT文件进行重计算和空中加油修复,需要在故障时存在CKPT文件。 + +### 启动脚本中添加环境变量 + +为避免在结合mindx使用时需配置多个组件的开关,添加环境变量,环境变量优先级高于args,设置环境变量会被优先使用。 + +`export HIGH_AVAILABILITY=dump` 启用 `--enable-high-availability` + +`export HIGH_AVAILABILITY=retry` 启用 `--enable-high-availability` `--enable-hbmfault-repair` + +`export HIGH_AVAILABILITY=recover` 启用 `--enable-high-availability` `--enable-worker-reboot` + +## 使用约束 + +由于原理限制,为了保证故障发生后,有完整的优化器状态数据,需要在ptd切分时保障Data Parallel Size大于1,在使用MoE特性时还要求稠密层与稀疏层的Data Parallel Size均大于1,在使用长序列并行特性时还要求dp_cp_size大于1。 + +详见:[MindIO TTP 约束限制-昇腾社区](https://www.hiascend.com/document/detail/zh/mindx-dl/600/clusterscheduling/ref/mindiottp/mindiotft005.html) \ No newline at end of file diff --git a/mindspeed_llm/core/distributed/param_and_grad_buffer.py b/mindspeed_llm/core/distributed/param_and_grad_buffer.py index 448e94a0f197c4eef48d08d73c1308b2d1f95dfe..cb548c77d1a4274f79b7406d71cbab2a6d5a159b 100644 --- a/mindspeed_llm/core/distributed/param_and_grad_buffer.py +++ b/mindspeed_llm/core/distributed/param_and_grad_buffer.py @@ -38,18 +38,20 @@ def start_param_sync(self, force_sync: bool = False): async_op = self.ddp_config.overlap_param_gather and not force_sync self.param_gather_handle = [] # Coalesce communication kernels across buckets in the bucket group. - + instance_group = self.intra_distributed_optimizer_instance_group_for_tft() + instance_rank = torch.distributed.get_rank( + group=instance_group + ) + instance_size = torch.distributed.get_world_size( + group=instance_group) for bucket in self.buckets: - instance_rank = torch.distributed.get_rank( - group=self.intra_distributed_optimizer_instance_group_for_tft() - ) local_data_view = shard_buffer( - bucket.param_data, self.intra_distributed_optimizer_instance_size_for_tft + bucket.param_data, instance_size )[instance_rank] handle = dist_all_gather_func( bucket.param_data, local_data_view, - group=self.intra_distributed_optimizer_instance_group_for_tft(), + group=instance_group, async_op=async_op, ) self.param_gather_handle.append(handle) @@ -71,22 +73,8 @@ def param_and_grad_bucket_group_init_wrapper(fn): is_expert_parallel |= not getattr(param, 'allreduce', True) from mindio_ttp.adaptor import (ttp_get_dp_cp_replica_group, ttp_get_dp_ep_replica_group) if self.ddp_config.use_distributed_optimizer: - if not is_expert_parallel: - self.intra_distributed_optimizer_instance_group_for_tft = ttp_get_dp_cp_replica_group - self.intra_distributed_optimizer_instance_size_for_tft = torch.distributed.get_world_size( - group=ttp_get_dp_cp_replica_group() - ) - self.intra_distributed_optimizer_instance_rank_for_tft = torch.distributed.get_rank( - group=ttp_get_dp_cp_replica_group() - ) - else: - self.intra_distributed_optimizer_instance_group_for_tft = ttp_get_dp_ep_replica_group - self.intra_distributed_optimizer_instance_size_for_tft = torch.distributed.get_world_size( - group=ttp_get_dp_ep_replica_group() - ) - self.intra_distributed_optimizer_instance_rank_for_tft = torch.distributed.get_rank( - group=ttp_get_dp_ep_replica_group() - ) + self.intra_distributed_optimizer_instance_group_for_tft = ttp_get_dp_cp_replica_group \ + if not is_expert_parallel else ttp_get_dp_ep_replica_group return return wrapper diff --git a/mindspeed_llm/core/optimizer/distrib_optimizer.py b/mindspeed_llm/core/optimizer/distrib_optimizer.py index a8e731a8670be3e3f387f84a101d207d92f28e4d..bf7fbfd8d5ad64538613632c113c73ab52385f29 100644 --- a/mindspeed_llm/core/optimizer/distrib_optimizer.py +++ b/mindspeed_llm/core/optimizer/distrib_optimizer.py @@ -13,7 +13,7 @@ from megatron.core.optimizer.grad_scaler import MegatronGradScaler from megatron.core.optimizer import OptimizerConfig from megatron.core.optimizer.optimizer import MixedPrecisionOptimizer from megatron.core.distributed.param_and_grad_buffer import partition_buckets -from mindspeed.optimizer.distrib_optimizer import reuse_fp32_param_distrib_optimizer_init_wrapper +from mindspeed.core.memory.reuse_param.adaptor import reuse_fp32_param_distrib_optimizer_init_wrapper from megatron.core.optimizer.cpu_offloading import HybridDeviceOptimizer from megatron.core.transformer.module import MegatronModule from megatron.core.config_logger import has_config_logger_enabled, log_config_to_disk @@ -172,7 +172,10 @@ def get_parameter_state_dp_zero_with_high_availability_wrapper(func): # gather buffer res buffer_res_full_shard = [] for shard_main_param_res_buffer in self.shard_main_param_res_buffers: - if global_rank == save_rank: + if self.disable_gloo_group and global_rank == save_rank: + recv_tensors = [torch.empty(shard_main_param_res_buffer.numel(), dtype=torch.float16, device="cpu") + for _ in range(len(save_rank_list))] + elif global_rank == save_rank: recv_tensors = [torch.empty((shard_main_param_res_buffer.numel(),), dtype=torch.float16, device="cpu") for _ in range(len(save_rank_list))] else: @@ -182,12 +185,20 @@ def get_parameter_state_dp_zero_with_high_availability_wrapper(func): send_tensor_bf16_view = torch.tensor(send_tensor.data.untyped_storage(), dtype=torch.bfloat16, device=send_tensor.device) send_tensor_bf16_view.copy_(shard_main_param_res_buffer.detach().cpu()) # gather支持fp16 - torch.distributed.gather( - send_tensor, - recv_tensors, - save_rank, - save_group_gloo, - ) + if self.disable_gloo_group: + from mindspeed.utils import _gather_hccl + _gather_hccl( + send_tensor, + recv_tensors, + self.data_parallel_group + ) + else: + torch.distributed.gather( + send_tensor, + recv_tensors, + save_rank, + save_group_gloo, + ) if global_rank == save_rank: res = [] for i in range(len(save_rank_list)): diff --git a/mindspeed_llm/features_manager/__init__.py b/mindspeed_llm/features_manager/__init__.py index f5b7f292be9efa56abfe3262a45d31eeb5032fab..f4a9b97e2b53c750606368ee3378d095826c0377 100644 --- a/mindspeed_llm/features_manager/__init__.py +++ b/mindspeed_llm/features_manager/__init__.py @@ -250,6 +250,12 @@ def add_disable_gloo_group_feature(features_list: List[MindSpeedFeature]): ]) +def add_high_availability_feature(features_list: List[MindSpeedFeature]): + features_list.extend([ + HighAvailabilityFeature() + ]) + + def create_features_list(): features_list = [] add_megatron_basic_features(features_list) @@ -270,6 +276,7 @@ def create_features_list(): add_optimizer_features(features_list) add_swap_optimizer_feature(features_list) add_disable_gloo_group_feature(features_list) + add_high_availability_feature(features_list) return features_list diff --git a/mindspeed_llm/features_manager/high_availability/high_availability.py b/mindspeed_llm/features_manager/high_availability/high_availability.py index 25da2b1881f5bdc466b120d6bef0e86d15de3965..c1a73692ac2da98651fa40bb5d6b0a4547662a48 100644 --- a/mindspeed_llm/features_manager/high_availability/high_availability.py +++ b/mindspeed_llm/features_manager/high_availability/high_availability.py @@ -18,6 +18,10 @@ class HighAvailabilityFeature(MindSpeedFeature): group.add_argument('--distributed-optimizer-no-replica', action='store_true', help='high availability feature, repair from ckpt and disable replica optimizer') + def pre_validate_args(self, args): + from .high_availability_helper import get_env_args + get_env_args(args) + def validate_args(self, args): if args.enable_high_availability: try: @@ -33,6 +37,15 @@ class HighAvailabilityFeature(MindSpeedFeature): if args.swap_optimizer and args.enable_high_availability: raise AssertionError('switch of the high availability feature is unsupported') + def pre_register_patches(self, patch_manager, args): + from .communication_patch import communication_wrapper + from .high_availability_helper import skip_reuse_register_patches + for communication in ['barrier', 'all_reduce', '_all_gather_base', 'broadcast', 'all_gather_into_tensor']: + patch_manager.register_patch('torch.distributed.distributed_c10d.' + communication, + communication_wrapper) + from mindspeed.features_manager import ReuseFP32Param + ReuseFP32Param.register_patches = skip_reuse_register_patches(ReuseFP32Param.register_patches, args) + def register_patches(self, patch_manager, args): from .initialize_patch import setup_model_and_optimizer_wrapper, initialize_distributed_wrapper from mindspeed_llm.core import (start_grad_sync_wrapper, distributed_data_parallel_init_wrapper, @@ -65,10 +78,7 @@ class HighAvailabilityFeature(MindSpeedFeature): patch_manager.register_patch('megatron.core.pipeline_parallel.schedules.get_forward_backward_func', high_availability_get_forward_backward_func_wrapper) if args.reuse_fp32_param: - from mindspeed.optimizer.optimizer import mixed_precision_optimizer_step, reuse_fp32_param_init_wrapper, \ - optimizer_config_init_wrapper - patch_manager.register_patch('megatron.core.optimizer.optimizer.MixedPrecisionOptimizer.step', - mixed_precision_optimizer_step) + from mindspeed.core.memory.reuse_param.adaptor import reuse_fp32_param_init_wrapper, optimizer_config_init_wrapper patch_manager.register_patch('megatron.core.optimizer.optimizer.Float16OptimizerWithFloat16Params.__init__', reuse_fp32_param_init_wrapper) patch_manager.register_patch('megatron.core.optimizer.optimizer_config.OptimizerConfig.__init__', @@ -84,14 +94,3 @@ class HighAvailabilityFeature(MindSpeedFeature): build_train_valid_test_data_iterators_wrapper) patch_manager.register_patch('torch.distributed.distributed_c10d.new_group', new_group_wrapper) - - -class HighAvailabilityCommFeature(MindSpeedFeature): - def __init__(self): - super(HighAvailabilityCommFeature, self).__init__(feature_name='high-availability-comm', optimization_level=0) - - def pre_patch(self, patch_manager, args): - from mindspeed_llm.features_manager.high_availability.communication_patch import communication_wrapper - for communication in ['barrier', 'all_reduce', '_all_gather_base', 'broadcast', 'all_gather_into_tensor']: - patch_manager.register_patch('torch.distributed.distributed_c10d.' + communication, - communication_wrapper) diff --git a/mindspeed_llm/features_manager/high_availability/high_availability_helper.py b/mindspeed_llm/features_manager/high_availability/high_availability_helper.py new file mode 100644 index 0000000000000000000000000000000000000000..eb1d3a5cf0ec0b73298f4fdf7e03e52ab06403c1 --- /dev/null +++ b/mindspeed_llm/features_manager/high_availability/high_availability_helper.py @@ -0,0 +1,26 @@ +import os +import warnings +from functools import wraps + + +def get_env_args(args): + env = os.getenv('HIGH_AVAILABILITY', '') + if env.lower() in ('dump', 'recover', 'retry'): + if not getattr(args, 'enable_high_availability', False): + warnings.warn( + "HIGH_AVAILABILITY environment variables enabled and args.enable_high_availability inactive" + ) + args.enable_high_availability = True + if env.lower() == 'recover': + args.enable_worker_reboot = True + if env.lower() == 'retry': + args.enable_hbmfault_repair = True + return args + + +def skip_reuse_register_patches(fn, argument): + @wraps(fn) + def wrapper(self, *args, **kwargs): + if not argument.enable_high_availability: + fn(self, *args, **kwargs) + return wrapper diff --git a/mindspeed_llm/tasks/megatron_adaptor.py b/mindspeed_llm/tasks/megatron_adaptor.py index 170b427651299ce206780b95fb0b62647a1bf610..6a7763fe276e628091a7399cd9df89fd0686cbce 100644 --- a/mindspeed_llm/tasks/megatron_adaptor.py +++ b/mindspeed_llm/tasks/megatron_adaptor.py @@ -126,10 +126,10 @@ class MegatronAdaptation: from mindspeed.patch_utils import MindSpeedPatchesManager from mindspeed.features_manager.megatron_basic.requirements_basic import RequirementsBasicFeature from mindspeed.features_manager.megatron_basic.transformer_engine_basic import TransformerEngineBasicFeature - from mindspeed_llm.features_manager.high_availability.high_availability import HighAvailabilityCommFeature + from mindspeed_llm.features_manager.high_availability.high_availability import HighAvailabilityFeature requirements = RequirementsBasicFeature() te_feature = TransformerEngineBasicFeature() - high_availability_comm_patch = HighAvailabilityCommFeature() + high_availability_comm_patch = HighAvailabilityFeature() # For torch >= 2.2.0 torch.compile = torch.jit.script @@ -147,7 +147,7 @@ class MegatronAdaptation: requirements.apex_adaptation(MindSpeedPatchesManager, _get_dummy_args()) requirements.torch_adaptation(MindSpeedPatchesManager, _get_dummy_args()) te_feature.pre_register_patches(MindSpeedPatchesManager, _get_dummy_args()) - high_availability_comm_patch.pre_patch(MindSpeedPatchesManager, _get_dummy_args()) + high_availability_comm_patch.pre_register_patches(MindSpeedPatchesManager, _get_dummy_args()) MindSpeedPatchesManager.apply_patches() @classmethod diff --git a/mindspeed_llm/tasks/megatron_adaptor_v2.py b/mindspeed_llm/tasks/megatron_adaptor_v2.py index 1598e71d90a54d88172d8ece6675c8dcefb02d12..7a068c0be7ead2337abc79f8f2cf9be40b6c7c12 100644 --- a/mindspeed_llm/tasks/megatron_adaptor_v2.py +++ b/mindspeed_llm/tasks/megatron_adaptor_v2.py @@ -54,8 +54,10 @@ class FeatureAdaptor: return cls._args from mindspeed_llm.training.arguments import process_args_v2 + from mindspeed_llm.features_manager.high_availability.high_availability_helper import get_env_args parser = argparse.ArgumentParser(description='MindSpeed-LLM Arguments', allow_abbrev=False) _args, unknown = process_args_v2(parser).parse_known_args() + get_env_args(_args) parser_unknown_args(_args, unknown) return _args diff --git a/mindspeed_llm/training/arguments.py b/mindspeed_llm/training/arguments.py index 3bb66e4ed20f23d36afeac1d8d59d290f645357a..fa83db1a39017dbd5b1f59608372c241e0a024d3 100644 --- a/mindspeed_llm/training/arguments.py +++ b/mindspeed_llm/training/arguments.py @@ -1292,7 +1292,6 @@ def _add_dummy_args_v2(args): args.return_document_ids = False args.attention_mask_on_cpu = False args.output_layer_slice_num = 1 - args.enable_high_availability = False args.use_fused_mlp = False diff --git a/mindspeed_llm/training/training.py b/mindspeed_llm/training/training.py index c12dd9e7f9e7de642279a3e8e31152075db5cbdb..f2f55a599bc8e54630a68bc7903cf5c541ca8d70 100644 --- a/mindspeed_llm/training/training.py +++ b/mindspeed_llm/training/training.py @@ -757,6 +757,10 @@ def train(forward_step_func, model, optimizer, opt_param_scheduler, if is_profile_enabled(): prof.step() + if args.enable_high_availability: + from mindio_ttp.framework_ttp import tft_pause_train + tft_pause_train(iteration) + if is_profile_enabled(): prof.stop() diff --git a/sources/images/high_availability/grad_buffer_sharding.png b/sources/images/high_availability/grad_buffer_sharding.png new file mode 100644 index 0000000000000000000000000000000000000000..2dfbd3b0cf8f0b8601a05bc1d6985a50b2046cc8 Binary files /dev/null and b/sources/images/high_availability/grad_buffer_sharding.png differ diff --git a/sources/images/high_availability/replica_optimizer.png b/sources/images/high_availability/replica_optimizer.png new file mode 100644 index 0000000000000000000000000000000000000000..bc9b12881843c1f421a6e8d8a6ac987416b60d96 Binary files /dev/null and b/sources/images/high_availability/replica_optimizer.png differ