diff --git a/profiler/precheck/collect/collector.py b/profiler/precheck/collect/collector.py new file mode 100644 index 0000000000000000000000000000000000000000..632ce31cdb39defdc31cdaf093e8229d7648e3d4 --- /dev/null +++ b/profiler/precheck/collect/collector.py @@ -0,0 +1,441 @@ +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +import os +import logging +from pathlib import Path +import argparse +import traceback +import time +import math + +import torch +import torch_npu +import torch.distributed as dist +import numpy as np +import torch.multiprocessing as mp + +from profiler.prof_common.path_manager import PathManager +from profiler.precheck.manager.group_manager import GroupManager +from profiler.precheck.common.constant import Constant +from profiler.precheck.common.time_stat import TimeStat +from profiler.precheck.common.utils import create_npu_event, event_elaspe_second, parition_sub_group_ranks, \ + get_master_rank_collect_dir, get_slave_rank_collect_dir, cat_files, is_equal_file_hash, get_quick_hash, \ + compress_directory +from profiler.precheck.common.diskmanager import DiskManager + + +class Collector: + rank = None + local_rank = None + group_rank = None + device = None + master_rank_num = None + time_stat = None + stream = None + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def init(self): + self.rank = GroupManager().get_rank() + self.local_rank = GroupManager().get_local_rank() + torch.npu.set_device(self.local_rank) + self.device = torch.device('npu:%d' % self.local_rank) + self.world_size = int(os.environ['WORLD_SIZE']) + self.time_stat = TimeStat() + self.stream = torch_npu.npu.current_stream() + + def gather_rank_data(self, group, gather_tensor, all_gather=False, dst_rank=None) -> list: + cur_group_size = dist.get_world_size(group) + self.logger.debug("[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) + wait_event = create_npu_event(self.stream) + dist.barrier(group=group) + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + if all_gather: + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] + dist.all_gather(gather_list, gather_tensor, group=group) + else: + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] if self.rank == dst_rank else None + dist.gather(gather_tensor, gather_list=gather_list, dst=dst_rank, group=group) + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream, start_event, end_event) + + return gather_list, wait_time, transfer_time + + def create_sub_group(self, file_sizes_hash): + #需要根据file_sizes来划分sub_group ranks + file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] + partitions = parition_sub_group_ranks(self.master_rank_num, file_sizes) + self.logger.debug("[Rank %d] subgroup partiitons %s" % (self.rank, partitions)) + + for ranks in partitions: + if len(ranks) > 1: + wait_event = create_npu_event(self.stream) + dist.barrier() + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + sub_group = dist.new_group(ranks = ranks, backend='hccl') + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream,start_event, end_event) + + self.logger.info('[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) + cur_file_sizes = [file_sizes_hash[r].cpu().tolist()[0] for r in ranks[1:]] + cur_file_hashes = [file_sizes_hash[r].cpu().tolist()[1:] for r in ranks[1:]] + + GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, file_hashes=cur_file_hashes) + else: + self.logger.debug('[Rank %d] ranks %s not enough for creating subgroup' % (self.rank, ranks)) + self.time_stat.init_pg_stat.sub_group_init = [wait_time, transfer_time] + + def bd_split_file_size(self, sub_group, split_size=None): + split_size_bd = torch.tensor([split_size], dtype=torch.int64, device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, dtype=torch.int64, device=self.device) + wait_event = create_npu_event(self.stream) + dist.barrier(group=sub_group.group) + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + self.logger.info("[Rank %d] after split size barrier" % self.rank) + dist.broadcast(split_size_bd, group=sub_group.group, src=sub_group.master_rank) + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream, start_event, end_event) + self.logger.info("[Rank %d] after split size bd, %s" % (self.rank, split_size_bd)) + + self.time_stat.com_stat.broad_splits = [wait_time, transfer_time] + return split_size_bd.cpu().item() + + def gather_file_split(self, sub_group, tensor, output_file_dir=None): + for i in range(sub_group.max_splits): + #is master node + if self.rank < self.master_rank_num: + cur_tensor = torch.zeros(sub_group.split_file_size, dtype=torch.uint8, device=self.device) + else: + start_time = time.perf_counter() + cur_tensor = tensor[i*sub_group.split_file_size: (i+1)*sub_group.split_file_size] + if len(cur_tensor) < sub_group.split_file_size: + cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size-len(cur_tensor)), 'constant', constant_values=0) + cur_tensor = torch.tensor(cur_tensor, dtype=torch.uint8, device=self.device) + end_time = time.perf_counter() + self.time_stat.disk_stat.read_input_file_splits.append(end_time-start_time) + + #gather rank data内部有barrier与计时 + file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, group=sub_group.group, gather_tensor=cur_tensor) + self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % (self.rank, i, wait_time, transfer_time)) + self.time_stat.com_stat.gather_file_splits.append([wait_time, transfer_time]) + + #记录从memory_on_chip刷到硬盘中的耗时 + if file_tensor_list: + master_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) + memory_on_chip_ram_times = [] + ram_disk_times = [] + for rank_i, rank in enumerate(sub_group.ranks): + if rank != sub_group.master_rank: + group_rank = rank - self.master_rank_num + rank_dir = get_slave_rank_collect_dir(master_rank_collect_dir, group_rank) + if not os.path.exists(rank_dir): + os.makedirs(rank_dir, exist_ok=True) + rank_file = os.path.join(rank_dir, 'split_%d' % i) + cur_split_size = sub_group.splits[rank_i -1][i] + if cur_split_size > 0: + start_time = time.perf_counter() + data = file_tensor_list[rank_i][:cur_split_size].cpu().numpy().tobytes() + ram_time = time.perf_counter() + with open(rank_file, 'wb') as f: + f.write(data) + end_time = time.perf_counter() + memory_on_chip_ram_times.append(ram_time-start_time) + ram_disk_times.append(end_time-ram_time) + + self.time_stat.disk_stat.memory_on_chip.append(memory_on_chip_ram_times) + self.time_stat.disk_stat.ram_disk.append(ram_disk_times) + + for tensor in file_tensor_list: + del tensor + del file_tensor_list + torch.npu.empty_cache() + + + def concat_file_split(self, output_file_dir): + cur_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) + concat_times = [] + verify_hash_times = [] + for rank_i, rank in enumerate(self.sub_group.ranks): + #只提取slave rank的case + if rank == self.rank: + continue + group_rank = rank - self.master_rank_num + rank_dir = get_slave_rank_collect_dir(cur_rank_collect_dir, group_rank) + output_file_name = os.path.join(rank_dir, 'merge.zip') + file_split_names = [] + start_time = time.perf_counter() + with open(output_file_name, 'wb') as output_file: + for split_i in range(self.sub_group.max_splits): + file_split = os.path.join(rank_dir, 'split_%d' % split_i) + if not os.path.exists(file_split): + self.logger.error('[Rank %d] not exist file split %s' % (self.rank, file_split)) + else: + file_split_names.append(file_split) + cat_files(output_file_name, input_files=file_split_names) + for file_split in file_split_names: + os.remove(file_split) + + end_time = time.perf_counter() + concat_times.append(end_time-start_time) + self.logger.debug('[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time-start_time)) + + start_time = time.perf_counter() + output_file_hash = get_quick_hash(output_file_name) + self.logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank,rank_i, self.sub_group.file_hashes)) + if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i-1]): + self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % (self.rank, output_file_hash, self.sub_group.file_hashes[rank_i-1])) + end_time = time.perf_counter() + verify_hash_times.append(end_time-start_time) + + self.time_stat.disk_stat.hash_output_file = verify_hash_times + self.time_stat.disk_stat.concat_file = concat_times + + + def master_node_run(self,local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size=None): + try: + #设置环境变量,这些会在torch.dist中用到 + #因为master node rank为0, 所以global rank直接等于local rank + os.environ["RANK"] = str(local_rank) + os.environ["LOCAL_RANK"] = str(local_rank) + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = str(master_port) + os.environ["GROUP_RANK"] = "0" + os.environ["LOCAL_WORLD_SIZE"] = str(master_rank_num) + self.init() + self.master_rank_num = master_rank_num + + start_event = create_npu_event(self.stream) + self.logger.info('[Rank %d] Start master node process' % self.rank) + torch.npu.set_device(self.device) + dist.init_process_group(backend='hccl', rank=self.rank, world_size=self.world_size) + init_process_group_event = create_npu_event(self.stream) + elp_time = event_elaspe_second(self.stream, start_event, init_process_group_event) + self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) + self.time_stat.init_pg_stat.global_group_init = elp_time + + self.logger.info("[Rank %d] master node run" % (self.rank)) + # Step 2. Gather tensor size from slave node. + gather_tensor = torch.tensor([0,0,0,0,0,0,0,0,0], dtype=torch.int64, device=self.device) + # 分为 (file_size, file_hash) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] + + self.logger.debug('[Rank %d] gather file size time %f seconds' % (self.rank, transfer_time)) + + #判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 + file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] + total_file_size = sum(file_sizes) + + total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024*1024*1024) + + self.logger.debug('[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) + disk_free, expected_free_space_gb = DiskManager.get_disk_space(output_file_dir, total_size_gb) + + # Step 3. broadcast子通信域配置,建立子通信域 + self.logger.info("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) + self.create_sub_group(file_sizes_hash) + self.sub_group = GroupManager().get_rank_sub_group(self.rank) + + #以下进入每个子通信域特定的逻辑 + if self.sub_group: + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + + #未指定split file size的话,根据memory_on_chip/rank_num计算 + if not split_file_size: + if len(self.sub_group.ranks) > 0: + split_file_size = math.floor(Constant.MASTER_RANK_MEMORY_ON_CHIP / (len(self.sub_group.ranks))) + else: + logger.error("Value of self.sub_group.ranks can not below 0") + self.bd_split_file_size(self.sub_group, split_file_size) + self.sub_group.split_size(split_file_size) + self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % (self.rank, self.sub_group.split_file_size, self.sub_group.splits)) + self.gather_file_split(sub_group=self.sub_group, tensor=None, output_file_dir=output_file_dir) + self.logger.debug("[Rank %d] start concat file split" % self.rank) + self.concat_file_split(output_file_dir=output_file_dir) + if len(self.sub_group.ranks) > 1: + self.logger.info(self.time_stat.to_string()) + else: + self.logger.info("[Rank %d] master rank not in sub group" % self.rank) + dist.barrier() + except Exception as e: + self.logger.error("[ERROR] %s", e, exc_info=True) + raise e + finally: + dist.destroy_process_group() + + + def slave_node_run(self, input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank): + try: + self.logger.debug('Enter slave node run wrapper') + #设置环境变量,这些会在torch.dist中用到 + os.environ["RANK"] = str(rank) + os.environ["LOCAL_RANK"] = "0" + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = str(master_port) + os.environ["GROUP_RANK"] = str(node_rank) + os.environ["LOCAL_WORLD_SIZE"] = "1" + self.init() + self.master_rank_num = master_rank_num + torch.npu.set_device(self.device) + start_event = create_npu_event(self.stream) + dist.init_process_group(backend='hccl', rank=self.rank, world_size=self.world_size) + init_process_group_event = create_npu_event(self.stream) + elp_time = event_elaspe_second(self.stream, start_event, init_process_group_event) + self.time_stat.init_pg_stat.global_group_init = elp_time + + self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) + self.logger.info('[Rank %d] Start slave node process' % self.rank) + + # Step2. 先压缩文件,统计文件大小,再进入到gather逻辑里 + if os.path.isfile(input_file_dir): + file_path = input_file_dir + else: + PathManager.check_path_writeable(input_file_dir) + file_path = os.path.join(str(Path(input_file_dir).parent), 'compress.tar') + start_time = time.perf_counter() + compress_directory(input_file_dir, file_path) + end_time = time.perf_counter() + self.time_stat.disk_stat.compress_input_file = end_time - start_time + self.logger.info("[Rank %d] Compress directory time: %f seconds" % (self.rank, end_time - start_time)) + file_size = os.path.getsize(file_path) + start_time = time.perf_counter() + file_hash_chunks = get_quick_hash(file_path) + end_time = time.perf_counter() + self.time_stat.disk_stat.hash_input_file = end_time - start_time + self.logger.info("[Rank %d] Hash input file time: %f seconds" % (self.rank, end_time - start_time)) + file_hash_chunks.insert(0, file_size) + self.logger.info( + "[Rank %d] File hash chunks (first element is file size): %s" % (self.rank, file_hash_chunks)) + gather_tensor = torch.tensor(file_hash_chunks, dtype=torch.int64, device=self.device) + + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] + self.logger.info("[Rank %d] Gather file size - wait time: %f seconds, transfer time: %f seconds" % ( + self.rank, wait_time, transfer_time)) + #Step3. 建立子通信域 + self.logger.debug("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) + self.create_sub_group(file_sizes_hash) + self.sub_group = GroupManager().get_rank_sub_group(self.rank) + + #进入每个子通信域特定的逻辑 + if self.sub_group: + #Step4. broacast split size大小 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + split_file_size = self.bd_split_file_size(self.sub_group) + self.sub_group.split_size(split_file_size) + file_tensor = np.memmap(file_path, dtype=np.uint8, mode='r') + self.gather_file_split(sub_group=self.sub_group, tensor=file_tensor) + self.logger.info(self.time_stat.to_string()) + else: + self.logger.warning("[Rank %d] slave rank not in sub group" % (self.rank)) + dist.barrier() + except Exception as e: + self.logger.error("[ERROR] %s", e, exc_info=True) + raise e + finally: + dist.destroy_process_group() + + def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, split_file_size, time_out, log_file): + logging.basicConfig( + filename=log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) + self.logger.info({"message": "Run method arguments", + "class": self.__class__.__name__, + "method": sys._getframe().f_code.co_name, + "args": { + "input_file_dir": input_file_dir, + "output_file_dir": output_file_dir, + "nnodes": nnodes, + "node_rank": node_rank, + "master_addr": master_addr, + "master_port": master_port, + "master_rank_num": master_rank_num, + "split_file_size": split_file_size, + "time_out": time_out, + "log_file": log_file + }}) + + try: + # 计算calculate world size + world_size = nnodes + master_rank_num - 1 + #master node的逻辑 + if node_rank == 0: + processes = [] + for i in range(master_rank_num): + process = mp.Process(target=self.master_node_run, args=(i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) + self.logger.info("Start master node subprocess %d." % i) + process.start() + processes.append(process) + start_time = time.perf_counter() + try: + while True: + all_done = all(not process.is_alive() for process in processes) + if all_done: + self.logger.info("All subprocesses finished successfully.") + break + elapsed_time = time.perf_counter() - start_time + time.sleep(5) + if elapsed_time > time_out: + raise TimeoutError("Timeout reached. Terminating all subprocesses.") + + except TimeoutError as e: + self.logger.error("%s", e, exc_info=True) + for process in processes: + if process.is_alive(): + process.terminate() + process.join() + finally: + # 确保Ensure all processes are cleaned up + for process in processes: + process.join() + #slave node的逻辑 + else: + rank = node_rank + master_rank_num -1 + self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank) + except Exception as e: + self.logger.error("[ERROR] %s", e, exc_info=True) + raise e + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--input_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--output_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--nnodes", type=int, help='the total node number') + parser.add_argument("--node_rank", type=int, help='node rank in the cluster') + parser.add_argument("--master_addr", type=str, help='master address') + parser.add_argument("--master_port", type=int, default=29501, help='master port') + parser.add_argument("--master_rank_num", type=int, default=8, help='master rank nums') + + parser.add_argument("--split_file_size", type=int, default=None, help='split file size') + + # master node整体time out的时间 + parser.add_argument("--time_out", type=int, default=Constant.DEFAULT_TIME_OUT, + help='totoal process time out in seconds') + parser.add_argument("--log_file", type=str, default=None, help='logging file') + args = parser.parse_args() + + logging.basicConfig( + filename=args.log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) + logger = logging.getLogger(__name__) + + collector = Collector() + logger.debug(vars(args)) + args_dict = vars(args) + + try: + collector.run(**args_dict) + except Exception as e: + logger.error("[ERROR] %s", e, exc_info=True) + raise e