diff --git a/profiler/msprof_analyze/precheck/env_check/analyze.py b/profiler/msprof_analyze/precheck/env_check/analyze.py index 560d368a813730f3c6b9a82e0eeb99f93f293b4c..bb8288d5af454ca0f897ef234761ac1cd4dfacb9 100644 --- a/profiler/msprof_analyze/precheck/env_check/analyze.py +++ b/profiler/msprof_analyze/precheck/env_check/analyze.py @@ -12,21 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging + +import time +import logging +import torch_npu from msprof_analyze.precheck.common.constant import Constant class TimeAnalyze(): - def __init__(self, time): - self.time = time + def __init__(self, time_data): + self.time_data = time_data def time_analyze(self): - if not self.time: - logging.ERROR("self.time is undefined.") - return None + if not self.time_data: + logging.ERROR("Running time is empty.") + return None, None, None, None - time_list = list(self.time.values()) + time_list = list(self.time_data.values()) slow_rank = None slow_time = None mean_time = 0 @@ -34,7 +37,7 @@ class TimeAnalyze(): # 耗时极值编号和数据 slow_time = max(time_list) - slow_rank = max(self.time, key=self.time.get) + slow_rank = max(self.time_data, key=self.time_data.get) # 计算快慢差异 try: @@ -53,4 +56,25 @@ class TimeAnalyze(): else: isproblem = False - return slow_rank, slow_time, max_ratio, isproblem \ No newline at end of file + return slow_rank, slow_time, max_ratio, isproblem + + + + +class Timer: + """计时器""" + + def __init__(self): + self.start_time = None + self.end_time = None + self.delta = 0 + + def start(self): + self.start_time = torch_npu.npu.Event(enable_timing=True) + self.end_time = torch_npu.npu.Event(enable_timing=True) + self.start_time.record() + + def stop(self): + self.end_time.record() + torch_npu.npu.synchronize() + self.delta = self.start_time.elapsed_time(self.end_time) / 1000 \ No newline at end of file diff --git a/profiler/msprof_analyze/precheck/env_check/communication_check.py b/profiler/msprof_analyze/precheck/env_check/communication_check.py index 4704a3a262458dde0decb35f24b08d7014dfe92c..7de5609ebaa57d50222780c5f17f5f890848850a 100644 --- a/profiler/msprof_analyze/precheck/env_check/communication_check.py +++ b/profiler/msprof_analyze/precheck/env_check/communication_check.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# Copyright (c) 2025, Huawei Co., Ltd. # All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os +from typing import List, Optional +import argparse +import json +import logging +from msprof_analyze.precheck.env_check.analyze import Timer +import torch +import torch_npu +from torch import distributed as dist +from msprof_analyze.precheck.distributed_cluster.distributed_cluster_base import DistributedClusterBase from msprof_analyze.precheck.env_check.environment_check import HardwareCheck @@ -20,9 +31,242 @@ class CommunicationCheck(HardwareCheck): def __init__(self, args): super().__init__(args) + self.world_size = args.world_size + self.output = args.output if args is not None else "./" + self.tp = args.tensor_model_parallel_size + self.pp = args.pipeline_model_parallel_size + self.cp = args.context_model_parallel_size + self.ep = args.expert_model_parallel_size + self.validate_parallel_strategy() + self.initialize() + self.rank = dist.get_rank() + self.no_share_storage = False or args.no_shared_storage + self.tmp_path = '/tmp/p2p_test' + + def validate_parallel_strategy(self): + if self.tp * self.pp * self.cp * self.ep == 0: + raise ValueError( + f"The value of parallel strategy is not correct, " + f"tp:{self.tp}, pp:{self.pp}, cp:{self.cp}, ep:{self.ep}" + ) + + def initialize(self): + self.dcb = DistributedClusterBase() + if not dist.is_initialized(): + self.dcb.initialize_cluster_distributed() + self.dcb.initialize_communication_group(self.tp, self.pp, self.cp, self.ep) + + def perform_p2p_communication_async(self, comm_ranks, tensor_send, tensor_recv, proc_group): + """ + Perform point-to-point communication asynchronously. + """ + send_ops = [] + recv_ops = [] + if not self.is_pipeline_last_rank(comm_ranks): + send_ops.append( + dist.isend( + tensor=tensor_send, + dst=self.get_rank_to_next(ranks=comm_ranks), + group=proc_group + ) + ) + + if not self.is_pipeline_first_rank(comm_ranks): + tensor_recv_next = torch.empty(tensor_send.size(), dtype=torch.float32).npu() + recv_ops.append( + dist.irecv( + tensor=tensor_recv_next, + src=self.get_rank_to_prev(ranks=comm_ranks), + group=proc_group + ) + ) + + # 等待接收操作完成 + for op in recv_ops: + op.wait() + + if not self.is_pipeline_first_rank(comm_ranks): + send_ops.append( + dist.isend( + tensor=tensor_recv_next, + dst=self.get_rank_to_prev(ranks=comm_ranks), + group=proc_group + ) + ) + + if not self.is_pipeline_last_rank(comm_ranks): + recv_ops.append( + dist.irecv( + tensor=tensor_recv, + src=self.get_rank_to_next(ranks=comm_ranks), + group=proc_group + ) + ) + + # 等待所有发送和接收操作完成 + for op in send_ops + recv_ops: + op.wait() + + # 获取dp大小 + def get_dp_size(self): + # 返回world_size除以tp、pp、cp、ep的乘积 + return self.world_size // (self.tp * self.pp * self.cp) + + def comm_group_destroy(self): + """destroy the communication groups""" + self.dcb.destroy_comm() + + def get_rank_to_next(self, ranks): + cur_index = ranks.index(dist.get_rank()) + return ranks[(cur_index + 1) % len(ranks)] + + def get_rank_to_prev(self, ranks): + cur_index = ranks.index(dist.get_rank()) + return ranks[(cur_index - 1) % len(ranks)] + + def get_comm_group(self, group_ranks): + if group_ranks is None: + raise ValueError("The group is None, please check again") + for sub in group_ranks: + if dist.get_rank() in sub: + return sub + raise ValueError("Failed to find the current group") + + def is_pipeline_first_rank(self, group): + return dist.get_rank() == group[0] + + def is_pipeline_last_rank(self, group): + return dist.get_rank() == group[-1] + + def comm_group_create(self): + group_names_list = [] + group_info_dict = {} + if self.get_dp_size() > 1: + group_names_list.append("dp") + group_info_dict["dp"] = [self.dcb.get_data_parallel_group_ranks(), + self.dcb.get_data_parallel_group()] + if self.tp > 1: + group_names_list.append("tp") + group_info_dict["tp"] = [self.dcb.get_tensor_parallel_group_ranks(), + self.dcb.get_tensor_parallel_group()] + if self.pp > 1: + group_names_list.append("pp") + group_info_dict["pp"] = [self.dcb.get_pipeline_parallel_group_ranks(), + self.dcb.get_pipeline_parallel_group()] + if self.ep > 1: + group_names_list.append("ep") + group_info_dict["ep"] = [self.dcb.get_expert_parallel_group_ranks(), + self.dcb.get_expert_parallel_group()] + if self.cp > 1: + group_names_list.append("cp") + group_info_dict["cp"] = [self.dcb.get_context_parallel_group_ranks(), + self.dcb.get_context_parallel_group()] + return group_names_list, group_info_dict + + def get_group_name(self): + if len(self.group_names_list) == 0: + return [] + return self.group_names_list + + def collect(self, iteration: int = 5, batch_size=(32, 4096, 8192)): + """Perform point-to-point communication between ranks in the group""" + group_names_list, group_info_dict = self.comm_group_create() + local_cost = {"rank": dist.get_rank()} + if len(group_names_list) == 0: + raise ValueError("The tensor-parallel, pipeline-parallel, data-parallel, " + "expert-parallel, context-parallel is set less than 2, " + "no peer communication can be created") + tensor_send = torch.randn(batch_size, dtype=torch.float32).npu() + tensor_recv = torch.empty(batch_size, dtype=torch.float32).npu() + for group_name in group_names_list: + ranks, proc_group = group_info_dict[group_name] + local_cost[group_name] = [] + if len(ranks) <= 1: + raise ValueError(f"Failed to start communication group {group_name}," + f"since the group is {ranks}.") + for i in range(iteration): + if dist.get_rank() == 0: + logging.info(f">>Start communication: {group_name}, iteration: {i}") + tensor_send.uniform_(-1, 1) + if i == 0: + dist.barrier() + timer = Timer() + timer.start() + self.perform_p2p_communication_async(ranks, tensor_send, tensor_recv, proc_group) + timer.stop() + local_cost[group_name].append(timer.delta) + local_cost[f"{group_name}_avg_cost"] = sum(local_cost[group_name]) / iteration + if self.no_share_storage: + self.dump_tmp_file(local_cost, self.tmp_path) + else: + tmp_dir = os.path.join(self.output, 'tmp') + if not os.path.exists(tmp_dir): + os.makedirs(tmp_dir) + self.dump_tmp_file(local_cost, tmp_dir) + self.dcb.destroy_comm() + + def get_file_path(self, file_path, file_name): + """get the file path of the given file name""" + if not os.path.exists(file_path): + logging.info(f"path is not exist, creating output dir: {file_path}") + os.makedirs(file_path) + return os.path.join(file_path, file_name) + + def dump_tmp_file(self, data, save_dir): + """save data to file as worker_x_rank_y.json""" + cur_rank = dist.get_rank() + worker_id = os.getenv("GROUP_RANK") + if worker_id is None: + raise ValueError("GROUP_ID environment variable is not set.") + if data is None: + raise ValueError(f"data is not created by rank {cur_rank}, please check this") + dump_path = self.get_file_path(save_dir, f"worker_{worker_id}_rank_{cur_rank}.json") + try: + with open(dump_path, 'w') as f: + json.dump(data, f, indent=4) + except Exception as e: + logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") + + def get_json_files(self, file_dir): + for root, _, files in os.walk(file_dir): + for file in files: + if file.endswith('.json'): + yield os.path.join(root, file) + + def analyze(self, save_dir, file_name: str = 'communication_check.json'): + """ + 遍历指定路径下的所有JSON文件,并获取指定键对应的value + """ + result = {} + if int(os.getenv('GROUP_RANK')) != 0: + return + # 生成所有JSON文件的路径 + json_files = self.get_json_files(save_dir) + for file_path in json_files: + try: + # 打开并加载JSON文件 + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + for key in data: + result.setdefault(key, []).append(data[key]) + except json.JSONDecodeError: + logging.error(f"Error decoding JSON in {file_path}") + except Exception as e: + logging.error(f"An error occurred while processing {file_path}: {e}") + with open(os.path.join(self.output, file_name), 'w', encoding='utf-8') as f: + json.dump(result, f, indent=4) + - def collect(self): - pass +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--world_size", type=int, default=16, required=True) + parser.add_argument("--output", type=str, default="./env_check.json") + parser.add_argument("--tensor_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--pipeline_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--context_model_parallel_size", type=int, default=1, required=True) + parser.add_argument("--expert_model_parallel_size", type=int, default=1, required=True) + args = parser.parse_args() - def analyze(self): - pass + comm_check = CommunicationCheck(args=args) + comm_check.p2p_comm() + comm_check.comm_group_destroy() \ No newline at end of file diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 91dbd70d73d219822b13fc78c2876a1683c841a4..50a210965a8bc1114ca2b2c129130ee2d4a89324 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -12,6 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os +import time +import logging +import shutil +import json +import numpy as np from msprof_analyze.precheck.env_check.environment_check import HardwareCheck @@ -20,9 +27,212 @@ class IOCheck(HardwareCheck): def __init__(self, args): super().__init__(args) + self.global_worker_id = os.getenv("GROUP_RANK") + self.local_rank = os.getenv("LOCAL_RANK") + self.output = args.output if args is not None else './results.json' + self.tmp_work_dir = './data_file' if args and args.no_shared_storage else '/tmp/data_file' + self.ensure_tmp_work_dir_exists() + self.results = {self.global_worker_id: { + 'ckpt_read': [], + 'log_read': [], + 'ckpt_write': [], + 'log_write': [], + }} + + def ensure_tmp_work_dir_exists(self): + """确保数据目录存在""" + if not os.path.exists(self.tmp_work_dir): + os.makedirs(self.tmp_work_dir) + + def generate_log_message(self, index): + """生成日志消息""" + return (f"this is the create txt file for IO check in cluster, you can ignore this information " + f"and check the result in another file [{time.strftime('%Y-%m-%d %H:%M:%S')}] " + f"这是第 {index + 1} 条日志信息。\n") + + def get_file_path(self, index, file_type): + """生成文件路径""" + if file_type == "txt": + return os.path.join(self.tmp_work_dir, + f'worker_{self.global_worker_id}_data_{index}.txt') + elif file_type == "npy": + return os.path.join(self.tmp_work_dir, + f'worker_{self.global_worker_id}_data_{index}.npy') + else: + raise ValueError(f"file type {file_type} is not included in the list [txt, npy]") + + def is_local_rank_zero(self): + """检查本地排名是否为 0""" + return self.local_rank is not None and int(self.local_rank) == 0 + + def calculate_speed(self, start_time, end_time, data_size): + """计算读写速度""" + elapsed_time = end_time - start_time + return data_size / elapsed_time / (1024 ** 2) + + def generate_random_weights(self, shape=(4, 2048, 4096)): + """生成随机权重""" + return np.random.randn(*shape).astype(np.float32) + + def generate_file_like_log(self, iteration: int = 10000, scaler: int = 100): + """生成数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='txt') + start_time = time.time() + total_data_size = 0 + with open(file_path, 'w', encoding='utf-8') as file: + for i in range(iteration): + log_message = self.generate_log_message(i) * scaler + file.write(log_message) + file.flush() + total_data_size += len(log_message) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results[self.global_worker_id]['log_write'].append(write_speed) + return file_path + + def read_file_like_log(self, file_path=None): + """读取单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='txt') + try: + start_time = time.time() + data = "" + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + data += line + end_time = time.time() + data_size = len(data.encode('utf-8')) + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results[self.global_worker_id]['log_read'].append(read_speed) + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def generate_file_like_ckpt(self): + """生成大数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='npy') + weight = self.generate_random_weights() + total_data_size = weight.size * weight.itemsize + start_time = time.time() + np.save(file_path, weight) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results[self.global_worker_id]['ckpt_write'].append(write_speed) + return file_path + + def read_file_like_ckpt(self, file_path=None): + """读取大的单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='npy') + try: + start_time = time.time() + data = np.load(file_path) + end_time = time.time() + data_size = data.nbytes + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results[self.global_worker_id]['ckpt_read'].append(read_speed) + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def clean_cache_file(self, file_type=None): + """执行完删除缓存文件,避免磁盘空间不足""" + if file_type == 'txt': + file_path = self.get_file_path(self.local_rank, file_type='txt') + elif file_type == 'npy': + file_path = self.get_file_path(self.local_rank, file_type='npy') + else: + raise ValueError(f"no such file type {file_type} could be clean" + f"since cache file loaded temperaly") + try: + os.remove(file_path) + except FileNotFoundError: + logging.error(f"File {file_path} not found, cannot remove.") + except PermissionError: + logging.error(f"Permission denied when trying to remove {file_path}.") + except Exception as e: + logging.error(f"Unexpected error removing {file_path}: {e}") + def collect(self): - pass + """生成临时结果用于分析""" + try: + with open(self.output, 'w') as f: + json.dump(self.results, f, indent=4) + logging.info(f"Results have been saved to {self.output}") + except Exception as e: + logging.error(f"Error saving results to {self.output}: {e}") + + def process_dict(self, data): + """处理字典数据""" + if data is None: + return None + for key, value in data.items(): + if isinstance(value, dict): + data[key] = self.process_dict(value) + else: + try: + if len(value) > 0: + data[key] = sum(value) / len(value) + else: + data[key] = None + except TypeError: + data[key] = value + return data + + def get_json_files(self, file_dir): + for root, _, files in os.walk(file_dir): + for file in files: + if file.endswith('.json'): + yield os.path.join(root, file) + + def analyze(self, file_dir): + """ + 遍历指定路径下的所有JSON文件,并获取指定键对应的value,并将结果保存到指定的文件中 + """ + if int(os.getenv("RANK")) != 0: + return + data_to_merge = self.results + files_dir = self.get_json_files(file_dir) + for file_path in files_dir: + try: + with open(file_path, 'r') as file: + file_data = json.load(file) + for key, value in file_data.items(): + if key in data_to_merge and isinstance(value, dict): + for k, v in value.items(): + data_to_merge[key][k].extend(v) + else: + data_to_merge[key] = value + except FileNotFoundError: + logging.error(f"文件 {file_path} 未找到。") + except json.JSONDecodeError: + logging.error(f"文件 {file_path} 不是有效的 JSON 文件。") + data_to_merge = self.process_dict(data_to_merge) + with open('./final_result.json', 'w') as f: + json.dump(data_to_merge, f, indent=4) + - def analyze(self): - pass +if __name__ == "__main__": + io = IOCheck() + io.generate_file_like_log() + io.read_file_like_log() + io.generate_file_like_ckpt() + io.read_file_like_ckpt() + io.clean_cache_file('txt') + io.clean_cache_file('npy') + io.collect() + io.analyze('./tmp_result/') + io.clean_cache_file('npy') \ No newline at end of file