diff --git a/debug/accuracy_tools/kj600/README.md b/debug/accuracy_tools/kj600/README.md index 1782e58bec0404092bb8c6784a8235c68f536ac9..be01be25d1cc49441da989d2b18b27665587919c 100644 --- a/debug/accuracy_tools/kj600/README.md +++ b/debug/accuracy_tools/kj600/README.md @@ -34,7 +34,50 @@ cd kj600 pip install . ``` -# 快速上手 +## 快速上手 +### 梯度监控 +模型训练状态的异常通常会反映在loss和梯度上,通过对模型各个模块梯度的监控,可以帮助快速定位异常的第一现场。 + + +1. 配置tensorboard写入的目录 +监控结果写入tensorboard的event文件中,设置输出路径(默认为`kj600_output`) + +```bash +export KJ600_OUTPUT_DIR=/xxx/output_dir +tensorboard --logdir=$KJ600_OUTPUT_DIR +``` + +2. 在训练脚本中使能工具 + +``` +from kj600.module_hook import TrainerMon + +model, optimizer, opt_param_scheduler = setup_model_and_optimizer( + model_provider, model_type) +# 模型初始化后插入工具代码 +hooker = TrainerMon("./monitor_config.json", params_have_main_grad=True) +hooker.hook_modules(model=model, grad_acc_steps=args.global_batch_size//args.data_parallel_size//args.micro_batch_size) +``` + +3. 在json文件中配置工具 +``` +{ + "targets": { + "module.language_model.encoder.layers.0": {} + }, + "print_struct": false, # 若不了解模型结构,可以打开print_struct打印模型结构 + "module_ranks": [0,1,2,3], # 需要监控的rank + "wg_distribution": true, + "alert": { + "rules": [{"rule_name": "AnomalyTurbulence", "args": {"threshold": 0.5}}] + }, + "ops": ["norm"], + "eps": 1e-8 +} +``` + + +## 详细配置 下面以Ascend/ModelLink训练框架为例,给出kj600工具的使用方法。 @@ -53,8 +96,7 @@ pip install . "wg_distribution": true, "cc_distribution": {"enable":true, "cc_codeline":[]}, "alert": { - "rules": [{"rule_name": "AnomalyTurbulence", "args": {"threshold": 0.5}}], - "inform": {"recipient": "database", "connection_str": "mysql+pymysql://username:password@host:port/database"} + "rules": [{"rule_name": "AnomalyTurbulence", "args": {"threshold": 0.5}}] }, "ops": ["min", "max", "norm", "zeros", "id"], "eps": 1e-8 diff --git a/debug/accuracy_tools/kj600/kj600/module_hook.py b/debug/accuracy_tools/kj600/kj600/module_hook.py index 3b600b2b7f28638d0df65305d933a765b4aa45d8..cad65a9d8f595d00064a1d6ef17866466ba33edc 100644 --- a/debug/accuracy_tools/kj600/kj600/module_hook.py +++ b/debug/accuracy_tools/kj600/kj600/module_hook.py @@ -2,6 +2,7 @@ import os import uuid import json from collections import defaultdict +from functools import partial from datetime import datetime import torch import torch.distributed as dist @@ -16,6 +17,11 @@ from kj600.module_metric import get_metrics, write_metrics_tensorboard, get_summ from kj600.distributed.wrap_distributed import api_register, create_hooks, op_aggregate from kj600.utils import print_warn_log, print_info_log, get_param_struct +if torch.cuda.is_available(): + DEVICE = 'cuda' +elif hasattr(torch, 'npu') and torch.npu.is_available(): + DEVICE = 'npu' + class ModuleHookContext: def __init__(self, module_name) -> None: @@ -30,20 +36,21 @@ class ModuleHookContext: self.focused_out_col = 0 self.ignore_in = False # no need to care when no key 'input' or 'input_grad' found - def set_format_by_arg(self, key_name:str, target_config:dict): + def set_format_by_arg(self, key_name: str, target_config: dict): if key_name in target_config[self.module_name]: self.format_by_arg[key_name] = target_config[self.module_name][key_name] elif key_name in ['input', 'input_grad']: self.ignore_in = True else: - raise KeyError(f"Missing key: {key_name} of {self.module_name} in config.json") + raise KeyError( + f"Missing key: {key_name} of {self.module_name} in config.json") class OptimizerContext: def __init__(self) -> None: self.step = 0 self.param_effective_rank = defaultdict(float) - self.param_mg_direction = defaultdict(float) + self.param_mg_direction = defaultdict(float) self.param_adam_update = defaultdict() self.param_adam_ratio = defaultdict() self.param_weight_grad = defaultdict() @@ -71,6 +78,20 @@ class CommunicationContext: def aggregate(self): self.data = self._agg(self.data) + +class GradContext: + def __init__(self) -> None: + self.pre = [] + self.post = [] + self.zeros_like = None + + def reset(self): + self.pre.clear() + self.post.clear() + if self.grad_acc is not None: + self.grad_acc = torch.zeros_like(self.grad_acc).to(DEVICE) + + class TrainerMon: tensor_metrics = TensorMetrics() @@ -81,6 +102,7 @@ class TrainerMon: self.module_bwd_hook_context_by_module = defaultdict(ModuleHookContext) self.optimizer_context = defaultdict(OptimizerContext) self.cc_context = defaultdict(CommunicationContext) + self.grad_context = defaultdict(GradContext) self.params_have_main_grad = params_have_main_grad self.config = get_config(config_file_path) self.module_rank_list = self.config.get("module_ranks", []) @@ -88,23 +110,26 @@ class TrainerMon: self.ops = self.config.get('ops', []) self.xy_distribution = self.config.get('xy_distribution', False) if not self.xy_distribution: - print_rank_0("> module input/output input_grad/output_grad is not monitored. ") - - # backward hook cause megatron-lm pipeline parallel schedule assert exception. + print_rank_0( + "> module input/output input_grad/output_grad is not monitored. ") + + # backward hook cause megatron-lm pipeline parallel schedule assert exception. # TBD: backward hook cause output tensor is view of some base tensor. root cause invesigation pending. - self.forward_only = self.config.get('forward_only', False) - if self.forward_only: + self.forward_only = self.config.get('forward_only', False) + if self.forward_only: print_rank_0("> only module forward is monitored. ") self.ur_distribution = self.config.get('ur_distribution', False) if not self.ur_distribution: - print_rank_0("> update vector and ratio vector of adam is not monitored. ") + print_rank_0( + "> update vector and ratio vector of adam is not monitored. ") self.mv_distribution = self.config.get("mv_distribution", False) if not self.mv_distribution: print_rank_0("> momentum and variance of adam is not monitored. ") self.wg_distribution = self.config.get("wg_distribution", False) if not self.wg_distribution: - print_rank_0("> weight grad of specified module is not monitored. ") + print_rank_0( + "> weight grad of specified module is not monitored. ") self.mg_direction = self.config.get('mg_direction', False) if not self.mg_direction: print_rank_0('> grad and momentum direction will not be compared.') @@ -117,14 +142,16 @@ class TrainerMon: self.cc_log_only = self.cc_distribution.get('cc_log_only', False) self.cc_logged_stack = defaultdict(set) self.cc_pre_hook = self.cc_distribution.get('cc_pre_hook', False) - api_register.initialize_hook(*create_hooks(context=self.cc_context, monitor=self)) + api_register.initialize_hook( + *create_hooks(context=self.cc_context, monitor=self)) api_register.redirect_api() - alert_setting = self.config.get('alert', {"rules":[]}) + alert_setting = self.config.get('alert', {"rules": []}) self.alert_rules = AnomalyScanner.load_rules(alert_setting["rules"]) - - anomaly_inform = AnomalyInformFactory.create_informer(**alert_setting["inform"]) if "inform" in alert_setting else None - + + anomaly_inform = AnomalyInformFactory.create_informer( + **alert_setting["inform"]) if "inform" in alert_setting else None + self.optimizer_hooked = False output_base_dir = os.getenv('KJ600_OUTPUT_DIR', './kj600_output') cur_time = datetime.now().strftime('%b%d_%H-%M-%S') @@ -134,7 +161,8 @@ class TrainerMon: self.summary_writer = SummaryWriterWithAD( os.path.join(output_base_dir, f"{cur_time}-rank{dist.get_rank()}-{unique_id}"), self.alert_rules, unique_id, anomaly_inform) else: - self.summary_writer = SummaryWriterWithAD(os.path.join(output_base_dir, f"{cur_time}-{unique_id}"), self.alert_rules, unique_id, anomaly_inform) + self.summary_writer = SummaryWriterWithAD(os.path.join( + output_base_dir, f"{cur_time}-{unique_id}"), self.alert_rules, unique_id, anomaly_inform) # A HeatmapVisualizer instance is associated with an image self.update_heatmap_visualizer = defaultdict(HeatmapVisualizer) self.ratio_heatmap_visualizer = defaultdict(HeatmapVisualizer) @@ -143,12 +171,15 @@ class TrainerMon: self.param_name_list = [] self.param2name = defaultdict(str) - self.mix_precision_optimizer_mon = OptimizerMonFactory.create_optimizer_mon(opt_ty) + self.mix_precision_optimizer_mon = OptimizerMonFactory.create_optimizer_mon( + opt_ty) if opt_ty is None: if self.ur_distribution: - raise Exception("ur_distribution cannot be enabled with unknown optimizer.") + raise Exception( + "ur_distribution cannot be enabled with unknown optimizer.") if self.mv_distribution: - raise Exception("mv_distribution cannot be enabled with unknown optimizer.") + raise Exception( + "mv_distribution cannot be enabled with unknown optimizer.") self.print_struct = self.config.get("print_struct", False) self.struct_printed = False self.module_struct = {} @@ -157,32 +188,35 @@ class TrainerMon: def __del__(self): if hasattr(self, "summary_writer"): self.summary_writer.close() - + @staticmethod def set_wrapped_optimizer(_wrapped_optimizer): MixPrecsionOptimizerMon.set_wrapped_optimizer(_wrapped_optimizer) @staticmethod - def adhoc_check(target_tensor:torch.tensor, module_name:str, tensor_name:str, rank_list, ops_list): + def adhoc_check(target_tensor: torch.tensor, module_name: str, tensor_name: str, rank_list, ops_list): rank = None if dist.is_initialized(): rank = dist.get_rank() if (rank not in rank_list) and len(rank_list) != 0: return - TrainerMon.tensor_metrics.stat_insert(target_tensor, ops_list, module_name, tensor_name, rank) + TrainerMon.tensor_metrics.stat_insert( + target_tensor, ops_list, module_name, tensor_name, rank) - def hook_modules(self, model:torch.nn.Module, grad_acc_steps): + def hook_modules(self, model: torch.nn.Module, grad_acc_steps): # fwd=0, bkd=1 - # targets is module name list like ["xx.xxx1", "xxx.xxx2"] which can be obtained when first run. + # targets is module name list like ["xx.xxx1", "xxx.xxx2"] which can be obtained when first run. print_rank_0("> module names:") for name, _ in model.named_modules(): print_rank_0(f"\t{name}") self.micro_batch_number = grad_acc_steps if not self.module_rank_list or (dist.is_initialized() and dist.get_rank() in self.module_rank_list): - targets = [x for x, _ in model.named_modules()] if self.print_struct else self.config['targets'].keys() + targets = [x for x, _ in model.named_modules( + )] if self.print_struct else self.config['targets'].keys() hooked_count = self._hook_module(targets, model, fwd_or_bkd=0) - print_rank_0(f"> {hooked_count} out of {len(self.config['targets'])} are monitored.") + print_rank_0( + f"> {hooked_count} out of {len(self.config['targets'])} are monitored.") else: return @@ -192,12 +226,18 @@ class TrainerMon: for name, param in model.named_parameters(): print_rank_0(f"\t{name}") for target_module, _ in self.config['targets'].items(): - if name.startswith(target_module): # name : language_model.encoder.layers.0.mlp.weight, target_module:language_model.encoder.layers.0 + # name : language_model.encoder.layers.0.mlp.weight, target_module:language_model.encoder.layers.0 + if name.startswith(target_module): self.param_name_list.append(name) self.param2name[param] = name self.hook_optimizer() return + def monitor_gnorm_with_ad(self, model, grad_acc_steps): + self._hook_weights(model) + self.hook_optimizer() + self.micro_batch_number = grad_acc_steps + def build_tbtag_tensor_map(self, module_name, tag, tensor): metrics = {} rank = dist.get_rank() if dist.is_initialized() else None @@ -215,7 +255,7 @@ class TrainerMon: continue metrics[key] = param_tensor[name] return metrics - + def generate_cc_metrics(self, cc_name, cc_tensor): metrics = defaultdict(dict) rank = dist.get_rank() if dist.is_initialized() else None @@ -234,66 +274,105 @@ class TrainerMon: return for _, fwd_context in self.module_fwd_hook_context_by_module.items(): if not len(fwd_context.actv) == self.micro_batch_number: - print_warn_log(f"fwd_context.actv not equal to micro_batch_number: {len(fwd_context.actv)}, {self.micro_batch_number}") + print_warn_log( + f"fwd_context.actv not equal to micro_batch_number: {len(fwd_context.actv)}, {self.micro_batch_number}") for metric_name in self.ops: - write_metrics_tensorboard(metric_name, self.summary_writer, fwd_context.actv, step) + write_metrics_tensorboard( + metric_name, self.summary_writer, fwd_context.actv, step) fwd_context.actv.clear() for _, bwd_context in self.module_bwd_hook_context_by_module.items(): if not len(bwd_context.actvgrad) == self.micro_batch_number: - print_warn_log(f"bwd_context.actvgrad not equal to micro_batch_number: {len(bwd_context.actvgrad)}, {self.micro_batch_number}") + print_warn_log( + f"bwd_context.actvgrad not equal to micro_batch_number: {len(bwd_context.actvgrad)}, {self.micro_batch_number}") for metric_name in self.ops: - write_metrics_tensorboard(metric_name, self.summary_writer, bwd_context.actvgrad, step) + write_metrics_tensorboard( + metric_name, self.summary_writer, bwd_context.actvgrad, step) bwd_context.actvgrad.clear() + def write_grad_tb(self, step): + if not self.wg_distribution: + return + + for name in self.param2name.values(): + context = self.grad_context[name] + + write_metrics_tensorboard( + 'norm', self.summary_writer, context.pre, step) + write_metrics_tensorboard( + 'norm', self.summary_writer, context.post, step) + + context.reset() + def hook_optimizer(self): # in DDP by default use params_have_main_grad def optimizer_pre_step_hook(optimizer, args, kwargs): context = self.optimizer_context[optimizer] if self.print_struct and not all(value == {} for value in self.module_struct.values()) and not self.struct_printed: self._smallest_rank_print("> module struct:") - self._smallest_rank_print(json.dumps(self.module_struct, indent=4)) + self._smallest_rank_print( + json.dumps(self.module_struct, indent=4)) self.struct_printed = True if not self.cc_log_only: - raise Exception("exit after first step when print model struct") + raise Exception( + "exit after first step when print model struct") if self.cc_log_only and context.step > 0: - self._smallest_rank_print("> Used communication ops and corresponding stack") - self._smallest_rank_print(json.dumps({k:[i.split(';') for i in v] for k,v in self.cc_logged_stack.items()}, indent=4)) + self._smallest_rank_print( + "> Used communication ops and corresponding stack") + self._smallest_rank_print(json.dumps( + {k: [i.split(';') for i in v] for k, v in self.cc_logged_stack.items()}, indent=4)) raise Exception("exit after first step when print cc stack") - context.param_exp_avg, context.param_exp_avg_sq, context.param_adam_update, context.param_adam_ratio = self.mix_precision_optimizer_mon.fetch_mv(self, - optimizer, self.param2name) - + optimizer, self.param2name) + rank = dist.get_rank() if dist.is_initialized() else None for param, name in self.param2name.items(): if "params_effrank" in self.config and name in self.config["params_effrank"]: - context.param_effective_rank[name] = eff_rank(param.detach()) + context.param_effective_rank[name] = eff_rank( + param.detach()) grad = param.main_grad if self.params_have_main_grad else param.grad if grad is None: - print_warn_log(f"grad is None: {name}, maybe something wrong happened.") + print_warn_log( + f"grad is None: {name}, maybe something wrong happened.") continue if self.wg_distribution: - context.param_weight_grad[name] = grad - if self.mg_direction: + metric_dict = {} + key = get_summary_writer_tag_name(name, 'post_grad', rank) + for metric_name in self.ops: + metric_dict[metric_name] = get_metrics( + metric_name, {key: grad}, self.eps) + self.grad_context[name].post.append(metric_dict) + + metric_dict = {} + key = get_summary_writer_tag_name(name, 'pre_grad', rank) + for metric_name in self.ops: + metric_dict[metric_name] = get_metrics( + metric_name, {key: self.grad_context[name].grad_acc}, self.eps) + self.grad_context[name].pre.append(metric_dict) + + if self.mg_direction: if context.step == 0: same_direction_ratio = torch.tensor(1.) else: - same_direction_ratio = get_sign_matches(grad, context.param_exp_avg[name]) + same_direction_ratio = get_sign_matches( + grad, context.param_exp_avg[name]) context.param_mg_direction[name] = same_direction_ratio tbtag_tensor_map = {} - if self.wg_distribution: - tbtag_tensor_map.update(self.generate_param_metrics('weight_grad', context.param_weight_grad)) if self.mv_distribution: - tbtag_tensor_map.update(self.generate_param_metrics('exp_avg', context.param_exp_avg)) - tbtag_tensor_map.update(self.generate_param_metrics('exp_avg_sq', context.param_exp_avg_sq)) + tbtag_tensor_map.update(self.generate_param_metrics( + 'exp_avg', context.param_exp_avg)) + tbtag_tensor_map.update(self.generate_param_metrics( + 'exp_avg_sq', context.param_exp_avg_sq)) if self.mg_direction: - tbtag_tensor_map.update(self.generate_param_metrics('mg_direction', context.param_mg_direction)) + tbtag_tensor_map.update(self.generate_param_metrics( + 'mg_direction', context.param_mg_direction)) # if not tbtag_tensor_map: # return metric_dict = {} for metric_name in self.ops: - metric_dict[metric_name] = get_metrics(metric_name, tbtag_tensor_map, self.eps) + metric_dict[metric_name] = get_metrics( + metric_name, tbtag_tensor_map, self.eps) for k, c in self.cc_context.items(): c.aggregate() cc_metrics = self.generate_cc_metrics(k, c) @@ -309,18 +388,22 @@ class TrainerMon: rank = dist.get_rank() if dist.is_initialized() else None self.write_xy_tb(context.step) + self.write_grad_tb(context.step) self.write_adhoc_check(context.step) if self.ur_distribution: for param_name, _ in context.param_adam_update.items(): - self.update_heatmap_visualizer[param_name].visualize(get_summary_writer_tag_name(param_name, 'adam_update', rank), context.step, self.summary_writer) + self.update_heatmap_visualizer[param_name].visualize(get_summary_writer_tag_name( + param_name, 'adam_update', rank), context.step, self.summary_writer) for param_name, _ in context.param_adam_ratio.items(): - self.ratio_heatmap_visualizer[param_name].visualize(get_summary_writer_tag_name(param_name, 'adam_ratio', rank), context.step, self.summary_writer) + self.ratio_heatmap_visualizer[param_name].visualize(get_summary_writer_tag_name( + param_name, 'adam_ratio', rank), context.step, self.summary_writer) for metric_name in self.ops: if not context.metric_list: break - write_metrics_tensorboard(metric_name, self.summary_writer, context.metric_list, context.step) + write_metrics_tensorboard( + metric_name, self.summary_writer, context.metric_list, context.step) context.metric_list.clear() context.step += 1 @@ -359,19 +442,26 @@ class TrainerMon: context.set_format_by_arg('output', self.config['targets']) if not context.verified: if not context.ignore_in: - context.focused_in_col = validate_config_spec(context.format_by_arg['input'], module_input, context.module_name, 'input') - context.focused_out_col = validate_config_spec(context.format_by_arg['output'], module_output, context.module_name, 'output') + context.focused_in_col = validate_config_spec( + context.format_by_arg['input'], module_input, context.module_name, 'input') + context.focused_out_col = validate_config_spec( + context.format_by_arg['output'], module_output, context.module_name, 'output') context.verified = True # expect output be tensor type tbtag_tensor_map = {} if not context.ignore_in: - cared_input = module_input if context.focused_in_col is None else module_input[context.focused_in_col] - tbtag_tensor_map.update(self.build_tbtag_tensor_map(context.module_name, 'input', cared_input)) - cared_output = module_output if context.focused_out_col is None else module_output[context.focused_out_col] - tbtag_tensor_map.update(self.build_tbtag_tensor_map(context.module_name, 'output', cared_output)) + cared_input = module_input if context.focused_in_col is None else module_input[ + context.focused_in_col] + tbtag_tensor_map.update(self.build_tbtag_tensor_map( + context.module_name, 'input', cared_input)) + cared_output = module_output if context.focused_out_col is None else module_output[ + context.focused_out_col] + tbtag_tensor_map.update(self.build_tbtag_tensor_map( + context.module_name, 'output', cared_output)) metric_dict = {} for metric_name in self.ops: - metric_dict[metric_name] = get_metrics(metric_name, tbtag_tensor_map, self.eps) + metric_dict[metric_name] = get_metrics( + metric_name, tbtag_tensor_map, self.eps) if context.micro_step == 0 and context.actv: print_warn_log( f"actv context of {context.module_name} is not empty when first micro_step, maybe something wrong happened. Now clear it.") @@ -394,24 +484,33 @@ class TrainerMon: return if not context.format_by_arg: context.set_format_by_arg('input_grad', self.config['targets']) - context.set_format_by_arg('output_grad', self.config['targets']) + context.set_format_by_arg( + 'output_grad', self.config['targets']) if not context.verified: if not context.ignore_in: - context.focused_in_col = validate_config_spec(context.format_by_arg['input_grad'], input_grad, context.module_name, 'input_grad') - context.focused_out_col = validate_config_spec(context.format_by_arg['output_grad'], output_grad, context.module_name, 'output_grad') + context.focused_in_col = validate_config_spec( + context.format_by_arg['input_grad'], input_grad, context.module_name, 'input_grad') + context.focused_out_col = validate_config_spec( + context.format_by_arg['output_grad'], output_grad, context.module_name, 'output_grad') context.verified = True tbtag_tensor_map = {} if not context.ignore_in: - cared_input_grad = input_grad if context.focused_in_col is None else input_grad[context.focused_in_col] - tbtag_tensor_map.update(self.build_tbtag_tensor_map(context.module_name, 'input_grad', cared_input_grad)) - cared_output_grad = output_grad if context.focused_out_col is None else output_grad[context.focused_out_col] - tbtag_tensor_map.update(self.build_tbtag_tensor_map(context.module_name, 'output_grad', cared_output_grad)) + cared_input_grad = input_grad if context.focused_in_col is None else input_grad[ + context.focused_in_col] + tbtag_tensor_map.update(self.build_tbtag_tensor_map( + context.module_name, 'input_grad', cared_input_grad)) + cared_output_grad = output_grad if context.focused_out_col is None else output_grad[ + context.focused_out_col] + tbtag_tensor_map.update(self.build_tbtag_tensor_map( + context.module_name, 'output_grad', cared_output_grad)) metric_dict = {} for metric_name in self.ops: - metric_dict[metric_name] = get_metrics(metric_name, tbtag_tensor_map, self.eps) + metric_dict[metric_name] = get_metrics( + metric_name, tbtag_tensor_map, self.eps) if context.micro_step == 0 and context.actvgrad: - print_warn_log(f"actvgrad context of {context.module_name} is not empty when first micro_step, maybe something wrong happened. Now clear it.") + print_warn_log( + f"actvgrad context of {context.module_name} is not empty when first micro_step, maybe something wrong happened. Now clear it.") context.actvgrad.clear() context.actvgrad.append(metric_dict) @@ -426,10 +525,35 @@ class TrainerMon: self.module_struct[name] = {} if name in target_names: submodule.register_forward_hook(fwd_hook_fun) - self.module_fwd_hook_context_by_module[submodule] = ModuleHookContext(name) + self.module_fwd_hook_context_by_module[submodule] = ModuleHookContext( + name) if not self.forward_only: submodule.register_full_backward_hook(bwd_hook_fun) - self.module_bwd_hook_context_by_module[submodule] = ModuleHookContext(name) + self.module_bwd_hook_context_by_module[submodule] = ModuleHookContext( + name) print_rank_0(f"> {name} is monitored successfully") hooked_count += 1 return hooked_count + + def _hook_weights(self, model): + self.wg_distribution = True + rank = dist.get_rank() if dist.is_initialized() else None + + def param_hook(grad, context): + with torch.no_grad(): + context.grad_acc += grad + + if self.print_struct: + self.module_struct = { + module_name: 1. for module_name, module in model.named_modules()} + return + + for name, param in model.named_parameters(): + for target in self.config['targets'].keys(): + context = self.grad_context[name] + if name.startswith(target) and param.requires_grad: + self._smallest_rank_print(f'>> monitoring: {name}') + self.param2name[param] = name + param.register_hook( + partial(param_hook, context=context)) + context.grad_acc = torch.zeros_like(param).to(DEVICE) diff --git a/debug/accuracy_tools/kj600/kj600/module_metric.py b/debug/accuracy_tools/kj600/kj600/module_metric.py index e09536b072cf7953e6b6106420936416d4264d0e..15ca149215be608104bdc39f6089a5d5dbe2544c 100644 --- a/debug/accuracy_tools/kj600/kj600/module_metric.py +++ b/debug/accuracy_tools/kj600/kj600/module_metric.py @@ -4,7 +4,7 @@ import statistics from kj600.features import square_sum, get_max, get_min, get_zeros, get_nans, get_norm -def get_summary_writer_tag_name(module_or_param_name:str, tag:str, rank): +def get_summary_writer_tag_name(module_or_param_name: str, tag: str, rank): if rank is None: return f"{module_or_param_name}/{tag}" else: @@ -23,13 +23,15 @@ def register_config_metric(key, cls=None): config_metric_registry[key] = cls return cls + class TensorMetrics: def __init__(self) -> None: - self.metrics = {} #tensor_tag --> [] + self.metrics = {} # tensor_tag --> [] self.cur_idx = {} fun_map = {"norm": get_norm, "max": get_max, "min": get_min} - #get stats and insert into metrics dictionary + # get stats and insert into metrics dictionary + def stat_insert(self, tensor, stat_ops, module_name, tensor_name, rank, eps=1e-8): prefix = get_summary_writer_tag_name(module_name, tensor_name, rank) for stat_op in stat_ops: @@ -44,9 +46,11 @@ class TensorMetrics: for key, metric_list in self.metrics.items(): start = self.cur_idx[key] for v in metric_list[start:]: - tb_writer.add_scalar(key, v.item(), global_step=self.cur_idx[key]) + tb_writer.add_scalar( + key, v.item(), global_step=self.cur_idx[key]) self.cur_idx[key] += 1 + class Metric(object): @staticmethod def get_metric_value(tensor, eps): @@ -62,6 +66,7 @@ class Metric(object): metrics_dict[tag] = self.get_metric_value(tensor, eps) return metrics_dict + @register_config_metric("min") class MinMetric(Metric): @staticmethod @@ -71,7 +76,8 @@ class MinMetric(Metric): @staticmethod def metric_tensorboard(metric_name, summary_writer, metric_value, step): for key in metric_value[0][metric_name].keys(): - min_value = min([item[metric_name][key].item() for item in metric_value]) + min_value = min([item[metric_name][key].item() + for item in metric_value]) summary_writer.add_scalar(f'{key}_min', min_value, step) @@ -84,7 +90,8 @@ class MaxMetric(Metric): @staticmethod def metric_tensorboard(metric_name, summary_writer, metric_value, step): for key in metric_value[0][metric_name].keys(): - max_value = max([item[metric_name][key].item() for item in metric_value]) + max_value = max([item[metric_name][key].item() + for item in metric_value]) summary_writer.add_scalar(f'{key}_max', max_value, step) @@ -97,7 +104,8 @@ class NormMetric(Metric): @staticmethod def metric_tensorboard(metric_name, summary_writer, metric_value, step): for key in metric_value[0][metric_name].keys(): - norm_value = math.sqrt(sum([item[metric_name][key].item() for item in metric_value])) + norm_value = math.sqrt( + sum([item[metric_name][key].item() for item in metric_value])) summary_writer.add_scalar(f'{key}_norm', norm_value, step) @@ -110,21 +118,25 @@ class ZerosMetric(Metric): @staticmethod def metric_tensorboard(metric_name, summary_writer, metric_value, step): for key in metric_value[0][metric_name].keys(): - zeros_value = statistics.mean([item[metric_name][key].item() for item in metric_value]) + zeros_value = statistics.mean( + [item[metric_name][key].item() for item in metric_value]) summary_writer.add_scalar(f'{key}_zeros', zeros_value, step) + @register_config_metric("nans") class NaNsMetric(Metric): @staticmethod def get_metric_value(t, eps): return get_nans(t) - + @staticmethod def metric_tensorboard(metric_name, summary_writer, metric_value, step): for key in metric_value[0][metric_name].keys(): - nans_value = sum([v[metric_name][key].item() for v in metric_value]) + nans_value = sum([v[metric_name][key].item() + for v in metric_value]) summary_writer.add_scalar(f'{key}_nans', nans_value, step) + @register_config_metric("id") class IdentMetric(Metric): @staticmethod @@ -134,12 +146,14 @@ class IdentMetric(Metric): return tensor @staticmethod - def metric_tensorboard(metric_name, summary_writer, metric_value, step): #metric_value is a dict, key is parameter name and value is a list of scalar tensor + # metric_value is a dict, key is parameter name and value is a list of scalar tensor + def metric_tensorboard(metric_name, summary_writer, metric_value, step): if len(metric_value) == 1: for key, value in metric_value[0][metric_name].items(): if not value: continue - summary_writer.add_scalar(f'{key}_identical', value.item(), step) + summary_writer.add_scalar( + f'{key}_identical', value.item(), step) def get_metrics(metric_name, tag2tensor, eps): @@ -147,7 +161,8 @@ def get_metrics(metric_name, tag2tensor, eps): fun_metric = config_metric_registry[metric_name] return fun_metric().get_metrics(tag2tensor, eps) except KeyError as e: - raise ValueError(f"Not supported this metric, expected metric: {config_metric_registry.keys()}, actual metric: {metric_name}") from e + raise ValueError( + f"Not supported this metric, expected metric: {config_metric_registry.keys()}, actual metric: {metric_name}") from e def write_metrics_tensorboard(metric_name, summary_writer, metric_value, step): @@ -155,4 +170,5 @@ def write_metrics_tensorboard(metric_name, summary_writer, metric_value, step): fun_metric = config_metric_registry[metric_name] return fun_metric.metric_tensorboard(metric_name, summary_writer, metric_value, step) except KeyError as e: - raise ValueError(f"Not supported this metric, expected metric: {config_metric_registry.keys()}, actual metric: {metric_name}") from e + raise ValueError( + f"Not supported this metric, expected metric: {config_metric_registry.keys()}, actual metric: {metric_name}") from e