diff --git a/debug/accuracy_tools/msprobe/docs/19.monitor.md b/debug/accuracy_tools/msprobe/docs/19.monitor.md index 2374ef7680e59d5e85fe276dc8597ffb2f4bdbfd..3d4be725da17a333a8bd19a3bc8069bc291ba247 100644 --- a/debug/accuracy_tools/msprobe/docs/19.monitor.md +++ b/debug/accuracy_tools/msprobe/docs/19.monitor.md @@ -403,6 +403,17 @@ export MONITOR_OUTPUT_DIR=/xxx/output_dir }, ``` +注:当配置多条异常告警规则时,优先告警第一条,如以下配置时每一层会优先报AnomalyNan的告警(一般不建议配置多条规则): +```json + "alert": { + "rules": [ + {"rule_name": "AnomalyNan", "args": {"threshold": 1e10}}, + {"rule_name": "AnomalyTurbulence", "args": {"threshold": 0.5}} + ], + "dump": true + }, +``` + 2. 实例化工具时传入流水线并行group ```python monitor = TrainerMon( @@ -430,12 +441,14 @@ monitor = TrainerMon( } ``` +其中call_{xxx}中的xxx为API的执行调用顺序,为后续异常事件排序做准备。 + 3. 异常事件排序 当模型训练过程中出现较多异常数据,需要对异常事件排序。工具提供topk的异常排序能力,按照api的执行顺序进行排序,便于定界首次异常点。异常分析命令示例: ```shell -python3 -m msprobe.pytorch.monitor.anomaly_analyse -d $MONITOR_OUTPUT_DIR/anomaly_detected +python3 -m msprobe.core.monitor.anomaly_processor -d $MONITOR_OUTPUT_DIR/anomaly_detected ``` 异常事件分析结束,将topk事件写入文件`anomaly_detected/anomaly_analyse.json`。异常分析支持以下参数配置: diff --git a/debug/accuracy_tools/msprobe/mindspore/monitor/module_hook.py b/debug/accuracy_tools/msprobe/mindspore/monitor/module_hook.py index 375314bff28bd25ed251baea68156395a2bc9ad9..be9f2e0e4ebaff8c100cda2d3d160e7949ae04cb 100644 --- a/debug/accuracy_tools/msprobe/mindspore/monitor/module_hook.py +++ b/debug/accuracy_tools/msprobe/mindspore/monitor/module_hook.py @@ -483,6 +483,8 @@ class TrainerMon: if (self.print_struct and not all(value == {} for value in self.module_struct.values()) and not self.struct_printed): self._save_module_struct() + if not self.cc_log_only: + raise Exception("exit after first monitor step when print model struct") if is_skip_step(context.step, self.start_step, self.step_interval, self.has_collect_times, self.collect_times): return diff --git a/debug/accuracy_tools/msprobe/pytorch/monitor/data_writers.py b/debug/accuracy_tools/msprobe/pytorch/monitor/data_writers.py index bd6bde7e9f6ede789f520acc2138492e99bac509..6310cc18247c4d4b3dc2cf0940f20e431807f69b 100644 --- a/debug/accuracy_tools/msprobe/pytorch/monitor/data_writers.py +++ b/debug/accuracy_tools/msprobe/pytorch/monitor/data_writers.py @@ -80,6 +80,9 @@ class BaseWriterWithAD: xpu_stack = torch.stack(xpu_tensors).cpu() if xpu_tensors else torch.tensor([]) + if xpu_stack.__class__.__name__ == 'DTensor': + xpu_stack = xpu_stack.to_local() + # 按照输入的顺序恢复 result = [] cpu_tensors_idx, xpu_tensors_idx = 0, 0 diff --git a/debug/accuracy_tools/msprobe/pytorch/monitor/module_hook.py b/debug/accuracy_tools/msprobe/pytorch/monitor/module_hook.py index 45011e59f33bfc5eb41699b25a72fe192ee7fc46..3e0721d3650875995c76644c20a443b84096ac2b 100644 --- a/debug/accuracy_tools/msprobe/pytorch/monitor/module_hook.py +++ b/debug/accuracy_tools/msprobe/pytorch/monitor/module_hook.py @@ -15,6 +15,7 @@ import json import os import uuid +import importlib from collections import defaultdict from datetime import datetime from functools import partial @@ -50,6 +51,7 @@ torch_version_above_or_equal_2 = torch.__version__.split('+')[0] >= '2.0' if not torch_version_above_or_equal_2: raise ValueError("monitor require torch>=2.0") + FORMAT_MAPPING = { MonitorConst.TENSORBOARD: SummaryWriterWithAD, MonitorConst.CSV: CSVWriterWithAD, @@ -160,6 +162,7 @@ class TrainerMon: self.ratio_heatmap_visualizer = defaultdict(HeatmapVisualizer) self.origin_start_grad_sync = None self.fsdp_post_backward_hook = None + self.fsdp2_foreach_reduce = None self.config_timestamp = 0 # 后面有校验时间戳, 首次监控无需为了更新config文件时间戳而去改, 可通过dynamic_on开关直接打开 self.config = load_json(config_file_path) validate_config(self.config) @@ -195,6 +198,7 @@ class TrainerMon: self.tp_group = None self.enable_megatron = False self.fsdp_wrapped_module = False + self.fsdp2_wrapped_module = False self.micro_batch_number = 1 self.optimizer_mon = None self.optimizer_trans = None @@ -209,6 +213,7 @@ class TrainerMon: self.param2name = defaultdict(str) self.name2indices = defaultdict() self.name2param = {} + self.origin2squash = {} self.duplicate_param = {} self.name2tag = {} self.param_name_call_id = {} @@ -735,7 +740,6 @@ class TrainerMon: hook(optimizer, args, kwargs) step_final_hook(optimizer, args, kwargs) return out - return wrapper optimizer.__class__.step = patch_step(optimizer.__class__.step, optimizer) @@ -820,6 +824,10 @@ class TrainerMon: elif self.fsdp_post_backward_hook: # fsdp torch.distributed.fsdp._runtime_utils._post_backward_hook = self.fsdp_post_backward_hook logger.info("remove patch_post_backward_hook in fsdp.") + if self.fsdp2_foreach_reduce: # fsdp2 + torch.distributed.fsdp._fully_shard._fsdp_collectives.foreach_reduce = self.fsdp2_foreach_reduce + importlib.reload(torch.distributed.fsdp._fully_shard._fsdp_param_group) + logger.info("remove patch_foreach_reduce_hook in fsdp2.") else: # not megatron and not fsdp for handle in self.handles['wgrads']: handle.remove() @@ -904,12 +912,15 @@ class TrainerMon: continue if not self.fsdp_wrapped_module and param_name.startswith("_fsdp_wrapped_module"): self.fsdp_wrapped_module = True + if not self.fsdp2_wrapped_module and param.__class__.__name__ == "DTensor": + self.fsdp2_wrapped_module = True if self._is_target_param(param_name, param, prefix): name = prefix + squash_param_name(param_name, self.squash_name) if name in self.param2name.values(): name = prefix + param_name self.param2name[param] = name self.name2param[name] = param + self.origin2squash[param_name] = name if self.tp_group and not param_is_not_tensor_parallel_duplicate(param, self.tp_group): self.duplicate_param[name] = True @@ -1088,6 +1099,11 @@ class TrainerMon: self._patch_fsdp_post_backward_hook() return + if self.fsdp2_wrapped_module: + # patch fsdp2 _fully_shard._fsdp_collectives.foreach_reduce + self._patch_fsdp2_foreach_reduce() + return + if self.monitor_mbs_grad: self._hook_weights() return @@ -1121,7 +1137,6 @@ class TrainerMon: 每个forward阶段,fsdp对AccumulateGrad重复注册hook方法,monitor工具内注册hook无法生效, 因此对_post_backward_hook进行patch,在backward后,reduce_scatter前采集梯度。 """ - def patch_post_backward_hook(_post_backward_hook): def wrapper(state, handle, *unused): grad_dict = {} @@ -1140,7 +1155,6 @@ class TrainerMon: get_metrics(self.ops, grad_dict, self.eps, self.grad_context.pre) out = _post_backward_hook(state, handle, *unused) return out - return wrapper logger.info("Patch fsdp _post_backward_hook, collect pre_grad metrics.") @@ -1148,6 +1162,28 @@ class TrainerMon: torch.distributed.fsdp._runtime_utils._post_backward_hook = \ patch_post_backward_hook(torch.distributed.fsdp._runtime_utils._post_backward_hook) + def _patch_fsdp2_foreach_reduce(self): + def patch_foreach_reduce(foreach_reduce): + def wrapper(fsdp_params, unsharded_grads, *unused): + grad_dict = {} + for param, grad in zip(fsdp_params, unsharded_grads): + tag = self.name2tag.get(self.origin2squash[param._param_fqn], {}).get(MonitorConst.PRE_GRAD) + if tag is None: + continue + grad_dict[tag] = grad + self.register_param_call_id("foreach_reduce", tag) + get_metrics(self.ops, grad_dict, self.eps, self.grad_context.pre) + out = foreach_reduce(fsdp_params, unsharded_grads, *unused) + return out + return wrapper + + logger.info("Patch fsdp2 foreach_reduce, collect pre_grad metrics.") + import torch.distributed.fsdp._fully_shard._fsdp_param_group as _fsdp_param_group + import torch.distributed.fsdp._fully_shard._fsdp_collectives as _fsdp_collectives + self.fsdp2_foreach_reduce = _fsdp_collectives.foreach_reduce + _fsdp_collectives.foreach_reduce = patch_foreach_reduce(_fsdp_collectives.foreach_reduce) + importlib.reload(_fsdp_param_group) # 关键操作,不然会因为torch一开始就import foreach_reduce导致patch失效 + def _hook_weights(self): """ 遍历参数的梯度生成函数(grad_acc),并挂载hook,以便在该参数所有梯度计算后,采集通信聚合前梯度数据。 diff --git a/debug/accuracy_tools/msprobe/pytorch/monitor/optimizer_collect.py b/debug/accuracy_tools/msprobe/pytorch/monitor/optimizer_collect.py index c2991839e5b9acef7c141d822493f20afaf40efe..5d7c5d8219e70bb5df3c7bd15de9ec4d4b356a4e 100644 --- a/debug/accuracy_tools/msprobe/pytorch/monitor/optimizer_collect.py +++ b/debug/accuracy_tools/msprobe/pytorch/monitor/optimizer_collect.py @@ -53,7 +53,7 @@ class OptimizerMon(object): if param.numel() != element_in_cur_partition: if first_param: grad = grad.flatten()[-element_in_cur_partition:] - else: # supposed to be the last one + else: # supposed to be the last one grad = grad.flatten()[:element_in_cur_partition] first_param = False