diff --git a/profiler/msprof_analyze/precheck/env_check/cpu_check.py b/profiler/msprof_analyze/precheck/env_check/cpu_check.py index 66ec98e11b10d74c00e9735e87a557305e0bef33..456021eaef0959d5498b92960116ee6e8cb63f7e 100644 --- a/profiler/msprof_analyze/precheck/env_check/cpu_check.py +++ b/profiler/msprof_analyze/precheck/env_check/cpu_check.py @@ -22,6 +22,7 @@ import torch import torch_npu from msprof_analyze.precheck.env_check.analyze import TimeAnalyze +from msprof_analyze.precheck.common.file import File, FileOpen, FdOpen from msprof_analyze.precheck.env_check.environment_check import HardwareCheck from msprof_analyze.precheck.distributed_cluster.distributed_cluster_base import DistributedClusterBase @@ -46,8 +47,8 @@ class CPUCheck(HardwareCheck): for filename in os.listdir(data_path): if filename.endswith(".json"): filepath = os.path.join(data_path, filename) - with open(filepath, 'r') as f: - data = json.load(f) + with FileOpen(filepath, 'r') as f: + data = json.load(f.file_reader) time_all.update(data) # 分析输出 @@ -70,15 +71,16 @@ class CPUCheck(HardwareCheck): else: logger.info(f"CPUs are working well, no performance issues found.") - + + logger.info("===Finishing cpu check===") logger.removeHandler(handler) - def cpu_matmul(self, cpu_id): + def cpu_matmul(self): # 读取矩阵参数 env_check_dir = os.path.dirname(__file__) yaml_file = os.path.join(env_check_dir, 'matmul_shape.yaml') - with open(yaml_file, 'r') as f: - shape_dict = yaml.safe_load(f) + with FileOpen(yaml_file, 'r') as f: + shape_dict = yaml.safe_load(f.file_reader) batch_size = shape_dict['batch_size'] seq_len = shape_dict['seq_len'] hidden_size = shape_dict['hidden_size'] @@ -86,34 +88,35 @@ class CPUCheck(HardwareCheck): # 执行多次矩阵运算:mat_c + mat_a × mat_b for _ in range(10): - mat_a = torch.randn(batch_size, seq_len, hidden_size).to(f'cpu:{cpu_id}') - mat_b = torch.randn(batch_size, hidden_size, intermediate_size).to(f'cpu:{cpu_id}') - mat_c = torch.randn(seq_len, intermediate_size).to(f'cpu:{cpu_id}') + mat_a = torch.randn(batch_size, seq_len, hidden_size).to(f'cpu') + mat_b = torch.randn(batch_size, hidden_size, intermediate_size).to(f'cpu') + mat_c = torch.randn(seq_len, intermediate_size).to(f'cpu') torch.addbmm(mat_c, mat_a, mat_b) def collect(self, data_path: str): + if self.local_rank == 0: + logging.info("===Starting cpu check===") data_path = os.path.join(data_path, f"cpucheck") cpu_ids = os.cpu_count() torch.set_num_threads(cpu_ids) - for cpu_id in range(cpu_ids): - # 创建事件对象,用于记录运算的开始和结束时间 - start_event = torch_npu.npu.Event(enable_timing=True) - end_event = torch_npu.npu.Event(enable_timing=True) + # 创建事件对象,用于记录运算的开始和结束时间 + start_event = torch_npu.npu.Event(enable_timing=True) + end_event = torch_npu.npu.Event(enable_timing=True) - start_event.record() - self.cpu_matmul(cpu_id) - end_event.record() + start_event.record() + self.cpu_matmul() + end_event.record() - # 同步当前流,确保全部运算均已完成 - torch_npu.npu.current_stream().synchronize() - cpu_time = start_event.elapsed_time(end_event) - self.output[cpu_id] = cpu_time + # 同步当前流,确保全部运算均已完成 + torch_npu.npu.current_stream().synchronize() + cpu_time = start_event.elapsed_time(end_event) + self.output[self.node_rank] = cpu_time # 数据落盘 json_path = os.path.join(data_path, f"cpucheck_{self.node_rank}.json") os.makedirs(os.path.dirname(json_path), exist_ok=True) - with open(json_path, 'w') as json_file: + with FdOpen(json_path, 'w') as json_file: json.dump(self.output, json_file, ensure_ascii=False, indent=4) diff --git a/profiler/msprof_analyze/precheck/env_check/npu_check.py b/profiler/msprof_analyze/precheck/env_check/npu_check.py index 3488c598fb17416d6976933be97f3e037e0a4338..920bf37b787c500ff9efac43823c022e31b57564 100644 --- a/profiler/msprof_analyze/precheck/env_check/npu_check.py +++ b/profiler/msprof_analyze/precheck/env_check/npu_check.py @@ -22,6 +22,7 @@ import torch import torch_npu from msprof_analyze.precheck.env_check.analyze import TimeAnalyze +from msprof_analyze.precheck.common.file import File, FileOpen, FdOpen from msprof_analyze.precheck.env_check.environment_check import HardwareCheck from msprof_analyze.precheck.distributed_cluster.distributed_cluster_base import DistributedClusterBase @@ -45,8 +46,8 @@ class NPUCheck(HardwareCheck): for filename in os.listdir(data_path): if filename.endswith(".json"): filepath = os.path.join(data_path, filename) - with open(filepath, 'r') as f: - data = json.load(f) + with FileOpen(filepath, 'r') as f: + data = json.load(f.file_reader) time_all.update(data) # 分析输出 @@ -70,14 +71,15 @@ class NPUCheck(HardwareCheck): else: logger.info(f"NPUs are working well, no performance issues found.") + logger.info("===Finishing npu check===") logger.removeHandler(handler) def npu_matmul(self, npu_id): # 读取矩阵参数 env_check_dir = os.path.dirname(__file__) yaml_file = os.path.join(env_check_dir, 'matmul_shape.yaml') - with open(yaml_file, 'r') as f: - shape_dict = yaml.safe_load(f) + with FileOpen(yaml_file, 'r') as f: + shape_dict = yaml.safe_load(f.file_reader) batch_size = shape_dict['batch_size'] seq_len = shape_dict['seq_len'] hidden_size = shape_dict['hidden_size'] @@ -91,6 +93,8 @@ class NPUCheck(HardwareCheck): torch.addbmm(mat_c, mat_a, mat_b) def collect(self, data_path: str): + if self.local_rank == 0: + logging.info("===Starting npu check===") data_path = os.path.join(data_path, f"npucheck") # 创建事件对象,用于记录运算的开始和结束时间 start_event = torch_npu.npu.Event(enable_timing=True) @@ -108,7 +112,7 @@ class NPUCheck(HardwareCheck): # 数据落盘 json_path = os.path.join(data_path, f"npucheck_{self.rank}.json") os.makedirs(os.path.dirname(json_path), exist_ok=True) - with open(json_path, 'w') as json_file: + with FdOpen(json_path, 'w') as json_file: json.dump(self.output, json_file, ensure_ascii=False, indent=4)