From 5c3f4304d0e30d0cb68f7015193dd287b9d41584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E5=B3=BB=E9=93=AD?= Date: Wed, 4 Dec 2024 15:31:46 +0800 Subject: [PATCH 1/4] add collector.py --- profiler/precheck/collect/collector.py | 420 +++++++++++++++++++++++++ 1 file changed, 420 insertions(+) create mode 100644 profiler/precheck/collect/collector.py diff --git a/profiler/precheck/collect/collector.py b/profiler/precheck/collect/collector.py new file mode 100644 index 00000000000..36b4b69ee73 --- /dev/null +++ b/profiler/precheck/collect/collector.py @@ -0,0 +1,420 @@ +import sys +sys.path.append("../../..") + +import os +import logging +from pathlib import Path +from profiler.prof_common.path_manager import PathManager + +import torch +import torch_npu +import torch.distributed as dist +from profiler.precheck.group_manager.group_manager import GroupManager +from profiler.precheck.common.utils import * +from profiler.precheck.common.time_stat import * +from profiler.precheck.common.diskmanager import DiskManager +import numpy as np +import torch.multiprocessing as mp +import pdb +import argparse +import shutil +import traceback +import time +import math + +class Collector: + rank = None + local_rank = None + group_rank = None + device = None + master_rank_num = None + time_stat = None + stream = None + logger = None + + + def init(self): + self.rank = GroupManager().get_rank() + self.local_rank = GroupManager().get_local_rank() + torch.npu.set_device(self.local_rank) + self.device = torch.device('npu:%d' % self.local_rank) + self.world_size = int(os.environ['WORLD_SIZE']) + self.time_stat = TimeStat() + self.stream = torch_npu.npu.current_stream() + + def gather_rank_data(self, group, gather_tensor, all_gather=False, dst_rank=None) -> list: + cur_group_size = dist.get_world_size(group) + self.logger.debug("[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) + wait_event = create_npu_event(self.stream) + dist.barrier(group=group) + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + if all_gather: + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] + dist.all_gather(gather_list, gather_tensor, group=group) + else: + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] if self.rank == dst_rank else None + dist.gather(gather_tensor, gather_list=gather_list, dst=dst_rank, group=group) + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream, start_event, end_event) + + return gather_list, wait_time, transfer_time + + def create_sub_group(self, file_sizes_hash): + #需要根据file_sizes来划分sub_group ranks + file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] + partitions = parition_sub_group_ranks(self.master_rank_num, file_sizes) + self.logger.debug("[Rank %d] subgroup partiitons %s" % (self.rank, partitions)) + + for ranks in partitions: + if len(ranks) > 1: + wait_event = create_npu_event(self.stream) + dist.barrier() + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + sub_group = dist.new_group(ranks = ranks, backend='hccl') + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream,start_event, end_event) + + self.logger.info('[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) + cur_file_sizes = [file_sizes_hash[r].cpu().tolist()[0] for r in ranks[1:]] + cur_file_hashes = [file_sizes_hash[r].cpu().tolist()[1:] for r in ranks[1:]] + + GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, file_hashes=cur_file_hashes) + else: + self.logger.debug('[Rank %d] ranks %s not enough for creating subgroup' % (self.rank, ranks)) + self.time_stat.init_pg_stat.sub_group_init = [wait_time, transfer_time] + + def bd_split_file_size(self, sub_group, split_size=None): + split_size_bd = torch.tensor([split_size], dtype=torch.int64, device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, dtype=torch.int64, device=self.device) + wait_event = create_npu_event(self.stream) + dist.barrier(group=sub_group.group) + start_event = create_npu_event(self.stream) + wait_time = event_elaspe_second(self.stream, wait_event, start_event) + self.logger.info("[Rank %d] after split size barrier" % self.rank) + dist.broadcast(split_size_bd, group=sub_group.group, src=sub_group.master_rank) + end_event = create_npu_event(self.stream) + transfer_time = event_elaspe_second(self.stream, start_event, end_event) + self.logger.info("[Rank %d] after split size bd, %s" % (self.rank, split_size_bd)) + + self.time_stat.com_stat.broad_splits = [wait_time, transfer_time] + return split_size_bd.cpu().item() + + def gather_file_split(self, sub_group, tensor, output_file_dir=None): + for i in range(sub_group.max_splits): + #is master node + if self.rank < self.master_rank_num: + cur_tensor = torch.zeros(sub_group.split_file_size, dtype=torch.uint8, device=self.device) + else: + start_time = time.perf_counter() + cur_tensor = tensor[i*sub_group.split_file_size: (i+1)*sub_group.split_file_size] + if len(cur_tensor) < sub_group.split_file_size: + cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size-len(cur_tensor)), 'constant', constant_values=0) + cur_tensor = torch.tensor(cur_tensor, dtype=torch.uint8, device=self.device) + end_time = time.perf_counter() + self.time_stat.disk_stat.read_input_file_splits.append(end_time-start_time) + + #gather rank data内部有barrier与计时 + file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, group=sub_group.group, gather_tensor=cur_tensor) + self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % (self.rank, i, wait_time, transfer_time)) + self.time_stat.com_stat.gather_file_splits.append([wait_time, transfer_time]) + + #TODO: 需要记录从HBM刷到硬盘中的耗时 + if file_tensor_list: + master_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) + hbm_ram_times = [] + ram_disk_times = [] + for rank_i, rank in enumerate(sub_group.ranks): + if rank != sub_group.master_rank: + group_rank = rank - self.master_rank_num + rank_dir = get_slave_rank_collect_dir(master_rank_collect_dir, group_rank) + if not os.path.exists(rank_dir): + os.makedirs(rank_dir, exist_ok=True) + rank_file = os.path.join(rank_dir, 'split_%d' % i) + cur_split_size = sub_group.splits[rank_i -1][i] + if cur_split_size > 0: + start_time = time.perf_counter() + data = file_tensor_list[rank_i][:cur_split_size].cpu().numpy().tobytes() + ram_time = time.perf_counter() + with open(rank_file, 'wb') as f: + f.write(data) + end_time = time.perf_counter() + hbm_ram_times.append(ram_time-start_time) + ram_disk_times.append(end_time-ram_time) + + self.time_stat.disk_stat.hbm_ram.append(hbm_ram_times) + self.time_stat.disk_stat.ram_disk.append(ram_disk_times) + + for tensor in file_tensor_list: + del tensor + del file_tensor_list + torch.npu.empty_cache() + + + def concat_file_split(self, output_file_dir): + cur_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) + concat_times = [] + verify_hash_times = [] + for rank_i, rank in enumerate(self.sub_group.ranks): + #只提取slave rank的case + if rank == self.rank: + continue + group_rank = rank - self.master_rank_num + rank_dir = get_slave_rank_collect_dir(cur_rank_collect_dir, group_rank) + output_file_name = os.path.join(rank_dir, 'merge.tar') + # merge_file_size = 0 + file_split_names = [] + start_time = time.perf_counter() + with open(output_file_name, 'wb') as output_file: + for split_i in range(self.sub_group.max_splits): + file_split = os.path.join(rank_dir, 'split_%d' % split_i) + if not os.path.exists(file_split): + self.logger.error('[Rank %d] not exist file split %s' % (self.rank, file_split)) + else: + file_split_names.append(file_split) + cat_files(output_file_name, input_files=file_split_names) + for file_split in file_split_names: + os.remove(file_split) + + end_time = time.perf_counter() + concat_times.append(end_time-start_time) + self.logger.debug('[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time-start_time)) + + start_time = time.perf_counter() + output_file_hash = get_quick_hash(output_file_name) + logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank,rank_i, self.sub_group.file_hashes)) + if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i-1]): + self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % (self.rank, output_file_hash, self.sub_group.file_hashes[rank_i-1])) + end_time = time.perf_counter() + verify_hash_times.append(end_time-start_time) + + self.time_stat.disk_stat.hash_output_file = verify_hash_times + self.time_stat.disk_stat.concat_file = concat_times + + + def master_node_run(self,local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size=None): + try: + #设置环境变量,这些会在torch.dist中用到 + #因为master node rank为0, 所以global rank直接等于local rank + os.environ["RANK"] = str(local_rank) + os.environ["LOCAL_RANK"] = str(local_rank) + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = str(master_port) + os.environ["GROUP_RANK"] = "0" + os.environ["LOCAL_WORLD_SIZE"] = str(master_rank_num) + self.init() + self.master_rank_num = master_rank_num + + start_event = create_npu_event(self.stream) + self.logger.info('[Rank %d] Start master node process' % self.rank) + torch.npu.set_device(self.device) + dist.init_process_group(backend='hccl', rank=self.rank, world_size=self.world_size) + init_process_group_event = create_npu_event(self.stream) + elp_time = event_elaspe_second(self.stream, start_event, init_process_group_event) + self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) + self.time_stat.init_pg_stat.global_group_init = elp_time + + self.logger.info("[Rank %d] master node run" % (self.rank)) + # Step 2. Gather tensor size from slave node. + gather_tensor = torch.tensor([0,0,0,0,0,0,0,0,0], dtype=torch.int64, device=self.device) + # (file_size, file_hash) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] + + self.logger.debug('[Rank %d] gather file size time %f seconds' % (self.rank, transfer_time)) + + #判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 + file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] + total_file_size = sum(file_sizes) + + total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024*1024*1024) + + self.logger.debug('[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) + disk_free, expected_free_space_gb = DiskManager.get_disk_space(output_file_dir, total_size_gb) + + # Step 3. broadcast子通信域配置,建立子通信域 + self.logger.info("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) + self.create_sub_group(file_sizes_hash) + self.sub_group = GroupManager().get_rank_sub_group(self.rank) + + #以下进入每个子通信域特定的逻辑 + if self.sub_group: + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + + #未指定split file size的话,根据HBM/rank_num计算 + if not split_file_size: + split_file_size = math.floor(Constant.MASTER_RANK_HBM / (len(self.sub_group.ranks))) + + self.bd_split_file_size(self.sub_group, split_file_size) + # bd_split_file_size_event = create_npu_event(stream) + # logging.debug('[Rank %d] bd split file size %d, time %f seconds' % (self.rank, split_file_size, event_elaspe_second(stream, create_sub_group_event, bd_split_file_size_event))) + + self.sub_group.split_size(split_file_size) + self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % (self.rank, self.sub_group.split_file_size, self.sub_group.splits)) + self.gather_file_split(sub_group=self.sub_group, tensor=None, output_file_dir=output_file_dir) + self.logger.debug("[Rank %d] start concat file split" % self.rank) + self.concat_file_split(output_file_dir=output_file_dir) + if len(self.sub_group.ranks) > 1: + logger.info(self.time_stat.to_string()) + else: + self.logger.info("[Rank %d] master rank not in sub group" % self.rank) + dist.barrier() + except Exception as e: + logger.error("[ERROR] %s", e, exc_info=True) + raise e + finally: + dist.destroy_process_group() + + + def slave_node_run(self, input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank): + try: + self.logger.debug('Enter slave node run wrapper') + #设置环境变量,这些会在torch.dist中用到 + os.environ["RANK"] = str(rank) + os.environ["LOCAL_RANK"] = "0" + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = str(master_port) + os.environ["GROUP_RANK"] = str(node_rank) + os.environ["LOCAL_WORLD_SIZE"] = "1" + self.init() + self.master_rank_num = master_rank_num + torch.npu.set_device(self.device) + start_event = create_npu_event(self.stream) + dist.init_process_group(backend='hccl', rank=self.rank, world_size=self.world_size) + init_process_group_event = create_npu_event(self.stream) + elp_time = event_elaspe_second(self.stream, start_event, init_process_group_event) + self.time_stat.init_pg_stat.global_group_init = elp_time + + self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) + self.logger.info('[Rank %d] Start slave node process' % self.rank) + + # Step2. 先压缩文件,统计文件大小,再进入到gather逻辑里 + if os.path.isfile(input_file_dir): + file_path = input_file_dir + else: + #TODO: 在这里检查是否有目录写权限 + file_path = os.path.join(str(Path(input_file_dir).parent), 'compress.tar') + compress_directory(input_file_dir, file_path) + + #TODO: 计算file hash的速度可能需要优化。将读文件与计算file hash的时间抽出来 + file_size = os.path.getsize(file_path) + start_time = time.perf_counter() + file_hash_chunks = get_quick_hash(file_path) + end_time = time.perf_counter() + self.time_stat.disk_stat.hash_input_file = end_time - start_time + file_hash_chunks.insert(0, file_size) + gather_tensor = torch.tensor(file_hash_chunks, dtype=torch.int64, device=self.device) + + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] + # logging.debug('[Rank %d] gather file size time %f seconds' % (self.rank, event_elaspe_second(stream, init_process_group_event, gather_file_size_event))) + + #Step3. 建立子通信域 + self.logger.debug("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) + self.create_sub_group(file_sizes_hash) + self.sub_group = GroupManager().get_rank_sub_group(self.rank) + + #进入每个子通信域特定的逻辑 + if self.sub_group: + #Step4. broacast split size大小 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + split_file_size = self.bd_split_file_size(self.sub_group) + self.sub_group.split_size(split_file_size) + file_tensor = np.memmap(file_path, dtype=np.uint8, mode='r') + self.gather_file_split(sub_group=self.sub_group, tensor=file_tensor) + logger.info(self.time_stat.to_string()) + else: + self.logger.warning("[Rank %d] slave rank not in sub group" % (self.rank)) + dist.barrier() + except Exception as e: + self.logger.error("[ERROR] %s", e, exc_info=True) + raise e + finally: + dist.destroy_process_group() + + def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, split_file_size, time_out, log_file): + logging.basicConfig( + filename=log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) + self.logger = logging.getLogger(__name__) + + try: + #calculate world size + world_size = nnodes + master_rank_num - 1 + #master node的逻辑 + if node_rank == 0: + processes = [] + for i in range(master_rank_num): + process = mp.Process(target=self.master_node_run, args=(i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) + logger.info("Start master node subprocess %d." % i) + process.start() + processes.append(process) + start_time = time.perf_counter() + try: + while True: + all_done = all(not process.is_alive() for process in processes) + if all_done: + logger.info("All subprocesses finished successfully.") + break + #TODO: 检查当前output_file_dir路径是否有写权限 + elapsed_time = time.perf_counter() - start_time + time.sleep(5) + if elapsed_time > time_out: + raise TimeoutError("Timeout reached. Terminating all subprocesses.") + + except TimeoutError as e: + logger.error("%s", e, exc_info=True) + for process in processes: + if process.is_alive(): + process.terminate() + process.join() + finally: + # Ensure all processes are cleaned up + for process in processes: + process.join() + #slave node的逻辑 + else: + rank = node_rank + master_rank_num -1 + self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank) + except Exception as e: + logger.error("%s", e, exc_info=True) + raise e + + +if __name__ == "__main__": + try: + parser = argparse.ArgumentParser() + parser.add_argument("--input_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--output_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--nnodes", type=int, help='the total node number') + parser.add_argument("--node_rank", type=int, help='node rank in the cluster') + parser.add_argument("--master_addr", type=str, help='master address') + parser.add_argument("--master_port", type=int, default=29501, help='master port') + parser.add_argument("--master_rank_num", type=int, default=8, help='master rank nums') + + #TODO: 有两个可能的切分mode。第一种(也就是目前实现的)是每个master rank同时传所有的子通信域的数据,根据HBM/slave_node_num 来确定split file size。第二种是指定好每次split file size, 然后根据HBM/split file size来指定slave node num, 通过排队的方式,依次建立子通信域。两种方式的效率需要在实际集群场景下进行测试。 + parser.add_argument("--split_file_size", type=int, default=None, help='split file size') + + #master node整体time out的时间 + parser.add_argument("--time_out", type=int, default=Constant.DEFAULT_TIME_OUT, help='totoal process time out in seconds') + parser.add_argument("--log_file", type=str, default=None, help='logging file') + args = parser.parse_args() + + logging.basicConfig( + filename=args.log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) + logger = logging.getLogger(__name__) + + collector = Collector() + logger.debug(vars(args)) + args_dict = vars(args) + collector.run(**args_dict) + except Exception as e: + logger.error("%s", e, exc_info=True) + raise e -- Gitee From d7cdb80c5ef9385b391e5f9873a84382dc4f82d3 Mon Sep 17 00:00:00 2001 From: hhqx Date: Wed, 4 Dec 2024 15:32:24 +0800 Subject: [PATCH 2/4] fix collector.py --- profiler/precheck/collect/collector.py | 301 ++++++++++++++----------- 1 file changed, 174 insertions(+), 127 deletions(-) diff --git a/profiler/precheck/collect/collector.py b/profiler/precheck/collect/collector.py index 36b4b69ee73..f074fd62623 100644 --- a/profiler/precheck/collect/collector.py +++ b/profiler/precheck/collect/collector.py @@ -1,18 +1,22 @@ import sys -sys.path.append("../../..") - import os + +from profiler.precheck.common.constant import Constant +from profiler.precheck.common.time_stat import TimeStat +from profiler.precheck.common.utils import create_npu_event, event_elaspe_second, parition_sub_group_ranks, \ + get_master_rank_collect_dir, get_slave_rank_collect_dir, cat_files, is_equal_file_hash, get_quick_hash, \ + compress_directory + +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + import logging from pathlib import Path from profiler.prof_common.path_manager import PathManager -import torch +import torch import torch_npu import torch.distributed as dist -from profiler.precheck.group_manager.group_manager import GroupManager -from profiler.precheck.common.utils import * -from profiler.precheck.common.time_stat import * -from profiler.precheck.common.diskmanager import DiskManager +from profiler.precheck.manager.group_manager import GroupManager import numpy as np import torch.multiprocessing as mp import pdb @@ -22,6 +26,7 @@ import traceback import time import math + class Collector: rank = None local_rank = None @@ -30,8 +35,9 @@ class Collector: master_rank_num = None time_stat = None stream = None - logger = None + def __init__(self): + self.logger = logging.getLogger(__name__) def init(self): self.rank = GroupManager().get_rank() @@ -44,16 +50,19 @@ class Collector: def gather_rank_data(self, group, gather_tensor, all_gather=False, dst_rank=None) -> list: cur_group_size = dist.get_world_size(group) - self.logger.debug("[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) + self.logger.debug( + "[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) wait_event = create_npu_event(self.stream) dist.barrier(group=group) start_event = create_npu_event(self.stream) wait_time = event_elaspe_second(self.stream, wait_event, start_event) if all_gather: - gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in + range(cur_group_size)] dist.all_gather(gather_list, gather_tensor, group=group) else: - gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] if self.rank == dst_rank else None + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in + range(cur_group_size)] if self.rank == dst_rank else None dist.gather(gather_tensor, gather_list=gather_list, dst=dst_rank, group=group) end_event = create_npu_event(self.stream) transfer_time = event_elaspe_second(self.stream, start_event, end_event) @@ -61,7 +70,7 @@ class Collector: return gather_list, wait_time, transfer_time def create_sub_group(self, file_sizes_hash): - #需要根据file_sizes来划分sub_group ranks + # 需要根据file_sizes来划分sub_group ranks file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] partitions = parition_sub_group_ranks(self.master_rank_num, file_sizes) self.logger.debug("[Rank %d] subgroup partiitons %s" % (self.rank, partitions)) @@ -72,21 +81,26 @@ class Collector: dist.barrier() start_event = create_npu_event(self.stream) wait_time = event_elaspe_second(self.stream, wait_event, start_event) - sub_group = dist.new_group(ranks = ranks, backend='hccl') + sub_group = dist.new_group(ranks=ranks, backend='hccl') end_event = create_npu_event(self.stream) - transfer_time = event_elaspe_second(self.stream,start_event, end_event) + transfer_time = event_elaspe_second(self.stream, start_event, end_event) - self.logger.info('[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) + self.logger.info( + '[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) cur_file_sizes = [file_sizes_hash[r].cpu().tolist()[0] for r in ranks[1:]] cur_file_hashes = [file_sizes_hash[r].cpu().tolist()[1:] for r in ranks[1:]] - GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, file_hashes=cur_file_hashes) + GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, + file_hashes=cur_file_hashes) else: self.logger.debug('[Rank %d] ranks %s not enough for creating subgroup' % (self.rank, ranks)) self.time_stat.init_pg_stat.sub_group_init = [wait_time, transfer_time] def bd_split_file_size(self, sub_group, split_size=None): - split_size_bd = torch.tensor([split_size], dtype=torch.int64, device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, dtype=torch.int64, device=self.device) + split_size_bd = torch.tensor([split_size], dtype=torch.int64, + device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, + dtype=torch.int64, + device=self.device) wait_event = create_npu_event(self.stream) dist.barrier(group=sub_group.group) start_event = create_npu_event(self.stream) @@ -102,24 +116,28 @@ class Collector: def gather_file_split(self, sub_group, tensor, output_file_dir=None): for i in range(sub_group.max_splits): - #is master node + # is master node if self.rank < self.master_rank_num: cur_tensor = torch.zeros(sub_group.split_file_size, dtype=torch.uint8, device=self.device) else: start_time = time.perf_counter() - cur_tensor = tensor[i*sub_group.split_file_size: (i+1)*sub_group.split_file_size] + cur_tensor = tensor[i * sub_group.split_file_size: (i + 1) * sub_group.split_file_size] if len(cur_tensor) < sub_group.split_file_size: - cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size-len(cur_tensor)), 'constant', constant_values=0) + cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size - len(cur_tensor)), 'constant', + constant_values=0) cur_tensor = torch.tensor(cur_tensor, dtype=torch.uint8, device=self.device) end_time = time.perf_counter() - self.time_stat.disk_stat.read_input_file_splits.append(end_time-start_time) - - #gather rank data内部有barrier与计时 - file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, group=sub_group.group, gather_tensor=cur_tensor) - self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % (self.rank, i, wait_time, transfer_time)) + self.time_stat.disk_stat.read_input_file_splits.append(end_time - start_time) + + # gather rank data内部有barrier与计时 + file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, + group=sub_group.group, + gather_tensor=cur_tensor) + self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % ( + self.rank, i, wait_time, transfer_time)) self.time_stat.com_stat.gather_file_splits.append([wait_time, transfer_time]) - #TODO: 需要记录从HBM刷到硬盘中的耗时 + # TODO: 需要记录从HBM刷到硬盘中的耗时 if file_tensor_list: master_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) hbm_ram_times = [] @@ -131,7 +149,7 @@ class Collector: if not os.path.exists(rank_dir): os.makedirs(rank_dir, exist_ok=True) rank_file = os.path.join(rank_dir, 'split_%d' % i) - cur_split_size = sub_group.splits[rank_i -1][i] + cur_split_size = sub_group.splits[rank_i - 1][i] if cur_split_size > 0: start_time = time.perf_counter() data = file_tensor_list[rank_i][:cur_split_size].cpu().numpy().tobytes() @@ -139,29 +157,28 @@ class Collector: with open(rank_file, 'wb') as f: f.write(data) end_time = time.perf_counter() - hbm_ram_times.append(ram_time-start_time) - ram_disk_times.append(end_time-ram_time) + hbm_ram_times.append(ram_time - start_time) + ram_disk_times.append(end_time - ram_time) self.time_stat.disk_stat.hbm_ram.append(hbm_ram_times) - self.time_stat.disk_stat.ram_disk.append(ram_disk_times) - + self.time_stat.disk_stat.ram_disk.append(ram_disk_times) + for tensor in file_tensor_list: del tensor del file_tensor_list torch.npu.empty_cache() - - + def concat_file_split(self, output_file_dir): cur_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) concat_times = [] verify_hash_times = [] for rank_i, rank in enumerate(self.sub_group.ranks): - #只提取slave rank的case + # 只提取slave rank的case if rank == self.rank: continue group_rank = rank - self.master_rank_num rank_dir = get_slave_rank_collect_dir(cur_rank_collect_dir, group_rank) - output_file_name = os.path.join(rank_dir, 'merge.tar') + output_file_name = os.path.join(rank_dir, 'merge.zip') # merge_file_size = 0 file_split_names = [] start_time = time.perf_counter() @@ -175,27 +192,29 @@ class Collector: cat_files(output_file_name, input_files=file_split_names) for file_split in file_split_names: os.remove(file_split) - + end_time = time.perf_counter() - concat_times.append(end_time-start_time) - self.logger.debug('[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time-start_time)) + concat_times.append(end_time - start_time) + self.logger.debug( + '[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time - start_time)) start_time = time.perf_counter() output_file_hash = get_quick_hash(output_file_name) - logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank,rank_i, self.sub_group.file_hashes)) - if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i-1]): - self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % (self.rank, output_file_hash, self.sub_group.file_hashes[rank_i-1])) + self.logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank, rank_i, self.sub_group.file_hashes)) + if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i - 1]): + self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % ( + self.rank, output_file_hash, self.sub_group.file_hashes[rank_i - 1])) end_time = time.perf_counter() - verify_hash_times.append(end_time-start_time) - - self.time_stat.disk_stat.hash_output_file = verify_hash_times + verify_hash_times.append(end_time - start_time) + + self.time_stat.disk_stat.hash_output_file = verify_hash_times self.time_stat.disk_stat.concat_file = concat_times - - def master_node_run(self,local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size=None): + def master_node_run(self, local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, + split_file_size=None): try: - #设置环境变量,这些会在torch.dist中用到 - #因为master node rank为0, 所以global rank直接等于local rank + # 设置环境变量,这些会在torch.dist中用到 + # 因为master node rank为0, 所以global rank直接等于local rank os.environ["RANK"] = str(local_rank) os.environ["LOCAL_RANK"] = str(local_rank) os.environ["WORLD_SIZE"] = str(world_size) @@ -217,32 +236,35 @@ class Collector: self.logger.info("[Rank %d] master node run" % (self.rank)) # Step 2. Gather tensor size from slave node. - gather_tensor = torch.tensor([0,0,0,0,0,0,0,0,0], dtype=torch.int64, device=self.device) + gather_tensor = torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=torch.int64, device=self.device) # (file_size, file_hash) - file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, + gather_tensor=gather_tensor, + all_gather=True) self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] - + self.logger.debug('[Rank %d] gather file size time %f seconds' % (self.rank, transfer_time)) - - #判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 + + # 判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] total_file_size = sum(file_sizes) - total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024*1024*1024) + total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024 * 1024 * 1024) - self.logger.debug('[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) - disk_free, expected_free_space_gb = DiskManager.get_disk_space(output_file_dir, total_size_gb) + self.logger.debug( + '[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) # Step 3. broadcast子通信域配置,建立子通信域 self.logger.info("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) self.create_sub_group(file_sizes_hash) self.sub_group = GroupManager().get_rank_sub_group(self.rank) - #以下进入每个子通信域特定的逻辑 + # 以下进入每个子通信域特定的逻辑 if self.sub_group: - self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) - - #未指定split file size的话,根据HBM/rank_num计算 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % ( + self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + + # 未指定split file size的话,根据HBM/rank_num计算 if not split_file_size: split_file_size = math.floor(Constant.MASTER_RANK_HBM / (len(self.sub_group.ranks))) @@ -251,26 +273,26 @@ class Collector: # logging.debug('[Rank %d] bd split file size %d, time %f seconds' % (self.rank, split_file_size, event_elaspe_second(stream, create_sub_group_event, bd_split_file_size_event))) self.sub_group.split_size(split_file_size) - self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % (self.rank, self.sub_group.split_file_size, self.sub_group.splits)) + self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % ( + self.rank, self.sub_group.split_file_size, self.sub_group.splits)) self.gather_file_split(sub_group=self.sub_group, tensor=None, output_file_dir=output_file_dir) self.logger.debug("[Rank %d] start concat file split" % self.rank) self.concat_file_split(output_file_dir=output_file_dir) if len(self.sub_group.ranks) > 1: - logger.info(self.time_stat.to_string()) + self.logger.info(self.time_stat.to_string()) else: self.logger.info("[Rank %d] master rank not in sub group" % self.rank) dist.barrier() except Exception as e: - logger.error("[ERROR] %s", e, exc_info=True) + self.logger.error("[ERROR] %s", e, exc_info=True) raise e finally: dist.destroy_process_group() - def slave_node_run(self, input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank): try: self.logger.debug('Enter slave node run wrapper') - #设置环境变量,这些会在torch.dist中用到 + # 设置环境变量,这些会在torch.dist中用到 os.environ["RANK"] = str(rank) os.environ["LOCAL_RANK"] = "0" os.environ["WORLD_SIZE"] = str(world_size) @@ -289,68 +311,91 @@ class Collector: self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) self.logger.info('[Rank %d] Start slave node process' % self.rank) - + # Step2. 先压缩文件,统计文件大小,再进入到gather逻辑里 if os.path.isfile(input_file_dir): file_path = input_file_dir else: - #TODO: 在这里检查是否有目录写权限 + # TODO: 在这里检查是否有目录写权限 file_path = os.path.join(str(Path(input_file_dir).parent), 'compress.tar') + start_time = time.perf_counter() compress_directory(input_file_dir, file_path) - - #TODO: 计算file hash的速度可能需要优化。将读文件与计算file hash的时间抽出来 + end_time = time.perf_counter() + self.time_stat.disk_stat.compress_input_file = end_time - start_time + self.logger.info("[Rank %d] Compress directory time: %f seconds" % (self.rank, end_time - start_time)) + + # TODO: 计算file hash的速度可能需要优化。将读文件与计算file hash的时间抽出来 file_size = os.path.getsize(file_path) start_time = time.perf_counter() file_hash_chunks = get_quick_hash(file_path) end_time = time.perf_counter() self.time_stat.disk_stat.hash_input_file = end_time - start_time + self.logger.info("[Rank %d] Hash input file time: %f seconds" % (self.rank, end_time - start_time)) + file_hash_chunks.insert(0, file_size) + self.logger.info( + "[Rank %d] File hash chunks (first element is file size): %s" % (self.rank, file_hash_chunks)) gather_tensor = torch.tensor(file_hash_chunks, dtype=torch.int64, device=self.device) - file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, + gather_tensor=gather_tensor, + all_gather=True) self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] - # logging.debug('[Rank %d] gather file size time %f seconds' % (self.rank, event_elaspe_second(stream, init_process_group_event, gather_file_size_event))) + self.logger.info("[Rank %d] Gather file size - wait time: %f seconds, transfer time: %f seconds" % ( + self.rank, wait_time, transfer_time)) - #Step3. 建立子通信域 + # Step3. 建立子通信域 self.logger.debug("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) self.create_sub_group(file_sizes_hash) self.sub_group = GroupManager().get_rank_sub_group(self.rank) - #进入每个子通信域特定的逻辑 + # 进入每个子通信域特定的逻辑 if self.sub_group: - #Step4. broacast split size大小 - self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + # Step4. broacast split size大小 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % ( + self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) split_file_size = self.bd_split_file_size(self.sub_group) self.sub_group.split_size(split_file_size) file_tensor = np.memmap(file_path, dtype=np.uint8, mode='r') self.gather_file_split(sub_group=self.sub_group, tensor=file_tensor) - logger.info(self.time_stat.to_string()) + self.logger.info(self.time_stat.to_string()) else: self.logger.warning("[Rank %d] slave rank not in sub group" % (self.rank)) dist.barrier() - except Exception as e: + except Exception as e: self.logger.error("[ERROR] %s", e, exc_info=True) raise e finally: dist.destroy_process_group() - - def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, split_file_size, time_out, log_file): - logging.basicConfig( - filename=log_file, # File to write logs to - level=logging.DEBUG, # Minimum logging level to write to the file - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format - ) - self.logger = logging.getLogger(__name__) - try: - #calculate world size + def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, + split_file_size, time_out, log_file): + self.logger.info({"message": "Run method arguments", + "class": self.__class__.__name__, + "method": sys._getframe().f_code.co_name, + "args": { + "input_file_dir": input_file_dir, + "output_file_dir": output_file_dir, + "nnodes": nnodes, + "node_rank": node_rank, + "master_addr": master_addr, + "master_port": master_port, + "master_rank_num": master_rank_num, + "split_file_size": split_file_size, + "time_out": time_out, + "log_file": log_file + }}) + + try: + # calculate world size world_size = nnodes + master_rank_num - 1 - #master node的逻辑 + # master node的逻辑 if node_rank == 0: processes = [] for i in range(master_rank_num): - process = mp.Process(target=self.master_node_run, args=(i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) - logger.info("Start master node subprocess %d." % i) + process = mp.Process(target=self.master_node_run, args=( + i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) + self.logger.info("Start master node subprocess %d." % i) process.start() processes.append(process) start_time = time.perf_counter() @@ -358,63 +403,65 @@ class Collector: while True: all_done = all(not process.is_alive() for process in processes) if all_done: - logger.info("All subprocesses finished successfully.") + self.logger.info("All subprocesses finished successfully.") break - #TODO: 检查当前output_file_dir路径是否有写权限 + # TODO: 检查当前output_file_dir路径是否有写权限 elapsed_time = time.perf_counter() - start_time time.sleep(5) if elapsed_time > time_out: raise TimeoutError("Timeout reached. Terminating all subprocesses.") except TimeoutError as e: - logger.error("%s", e, exc_info=True) + self.logger.error("%s", e, exc_info=True) for process in processes: if process.is_alive(): process.terminate() process.join() finally: - # Ensure all processes are cleaned up for process in processes: process.join() - #slave node的逻辑 + # slave node的逻辑 else: - rank = node_rank + master_rank_num -1 - self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank) + rank = node_rank + master_rank_num - 1 + self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, + node_rank) except Exception as e: - logger.error("%s", e, exc_info=True) + self.logger.error("[ERROR] %s", e, exc_info=True) raise e if __name__ == "__main__": - try: - parser = argparse.ArgumentParser() - parser.add_argument("--input_file_dir", type=str, help='input profiling data dir') - parser.add_argument("--output_file_dir", type=str, help='input profiling data dir') - parser.add_argument("--nnodes", type=int, help='the total node number') - parser.add_argument("--node_rank", type=int, help='node rank in the cluster') - parser.add_argument("--master_addr", type=str, help='master address') - parser.add_argument("--master_port", type=int, default=29501, help='master port') - parser.add_argument("--master_rank_num", type=int, default=8, help='master rank nums') - - #TODO: 有两个可能的切分mode。第一种(也就是目前实现的)是每个master rank同时传所有的子通信域的数据,根据HBM/slave_node_num 来确定split file size。第二种是指定好每次split file size, 然后根据HBM/split file size来指定slave node num, 通过排队的方式,依次建立子通信域。两种方式的效率需要在实际集群场景下进行测试。 - parser.add_argument("--split_file_size", type=int, default=None, help='split file size') - - #master node整体time out的时间 - parser.add_argument("--time_out", type=int, default=Constant.DEFAULT_TIME_OUT, help='totoal process time out in seconds') - parser.add_argument("--log_file", type=str, default=None, help='logging file') - args = parser.parse_args() - - logging.basicConfig( - filename=args.log_file, # File to write logs to - level=logging.DEBUG, # Minimum logging level to write to the file - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format - ) - logger = logging.getLogger(__name__) - - collector = Collector() - logger.debug(vars(args)) - args_dict = vars(args) + parser = argparse.ArgumentParser() + parser.add_argument("--input_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--output_file_dir", type=str, help='input profiling data dir') + parser.add_argument("--nnodes", type=int, help='the total node number') + parser.add_argument("--node_rank", type=int, help='node rank in the cluster') + parser.add_argument("--master_addr", type=str, help='master address') + parser.add_argument("--master_port", type=int, default=29501, help='master port') + parser.add_argument("--master_rank_num", type=int, default=8, help='master rank nums') + + # TODO: 有两个可能的切分mode。第一种(也就是目前实现的)是每个master rank同时传所有的子通信域的数据,根据HBM/slave_node_num 来确定split file size。第二种是指定好每次split file size, 然后根据HBM/split file size来指定slave node num, 通过排队的方式,依次建立子通信域。两种方式的效率需要在实际集群场景下进行测试。 + parser.add_argument("--split_file_size", type=int, default=None, help='split file size') + + # master node整体time out的时间 + parser.add_argument("--time_out", type=int, default=Constant.DEFAULT_TIME_OUT, + help='totoal process time out in seconds') + parser.add_argument("--log_file", type=str, default=None, help='logging file') + args = parser.parse_args() + + logging.basicConfig( + filename=args.log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) + logger = logging.getLogger(__name__) + + collector = Collector() + logger.debug(vars(args)) + args_dict = vars(args) + + try: collector.run(**args_dict) except Exception as e: - logger.error("%s", e, exc_info=True) + logger.error("[ERROR] %s", e, exc_info=True) raise e -- Gitee From 751919d825a2d3d90fb9706a3085ff7ce0100ada Mon Sep 17 00:00:00 2001 From: aosudh <2169159625@qq.com> Date: Wed, 4 Dec 2024 17:20:00 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=B7=BB=E5=8A=A0log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/precheck/collect/collector.py | 226 +++++++++++-------------- 1 file changed, 99 insertions(+), 127 deletions(-) diff --git a/profiler/precheck/collect/collector.py b/profiler/precheck/collect/collector.py index f074fd62623..e28f5ecbc21 100644 --- a/profiler/precheck/collect/collector.py +++ b/profiler/precheck/collect/collector.py @@ -1,30 +1,26 @@ import sys -import os - -from profiler.precheck.common.constant import Constant -from profiler.precheck.common.time_stat import TimeStat -from profiler.precheck.common.utils import create_npu_event, event_elaspe_second, parition_sub_group_ranks, \ - get_master_rank_collect_dir, get_slave_rank_collect_dir, cat_files, is_equal_file_hash, get_quick_hash, \ - compress_directory - -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +sys.path.append("../../..") +import os import logging from pathlib import Path -from profiler.prof_common.path_manager import PathManager +import argparse +import traceback +import time +import math -import torch +import torch import torch_npu import torch.distributed as dist -from profiler.precheck.manager.group_manager import GroupManager import numpy as np import torch.multiprocessing as mp -import pdb -import argparse -import shutil -import traceback -import time -import math + +from profiler.prof_common.path_manager import PathManager +from profiler.precheck.manager.group_manager import GroupManager +from profiler.precheck.common.utils import * +from profiler.precheck.common.time_stat import * +from profiler.precheck.common.constant import * +from profiler.precheck.common.diskmanager import DiskManager class Collector: @@ -50,19 +46,16 @@ class Collector: def gather_rank_data(self, group, gather_tensor, all_gather=False, dst_rank=None) -> list: cur_group_size = dist.get_world_size(group) - self.logger.debug( - "[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) + self.logger.debug("[Rank %d] Local rank %d, gather data from %d ranks" % (self.rank, self.local_rank, cur_group_size)) wait_event = create_npu_event(self.stream) dist.barrier(group=group) start_event = create_npu_event(self.stream) wait_time = event_elaspe_second(self.stream, wait_event, start_event) if all_gather: - gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in - range(cur_group_size)] + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] dist.all_gather(gather_list, gather_tensor, group=group) else: - gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in - range(cur_group_size)] if self.rank == dst_rank else None + gather_list = [torch.zeros_like(gather_tensor, dtype=gather_tensor.dtype, device=self.device) for _ in range(cur_group_size)] if self.rank == dst_rank else None dist.gather(gather_tensor, gather_list=gather_list, dst=dst_rank, group=group) end_event = create_npu_event(self.stream) transfer_time = event_elaspe_second(self.stream, start_event, end_event) @@ -70,7 +63,7 @@ class Collector: return gather_list, wait_time, transfer_time def create_sub_group(self, file_sizes_hash): - # 需要根据file_sizes来划分sub_group ranks + #需要根据file_sizes来划分sub_group ranks file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] partitions = parition_sub_group_ranks(self.master_rank_num, file_sizes) self.logger.debug("[Rank %d] subgroup partiitons %s" % (self.rank, partitions)) @@ -81,26 +74,21 @@ class Collector: dist.barrier() start_event = create_npu_event(self.stream) wait_time = event_elaspe_second(self.stream, wait_event, start_event) - sub_group = dist.new_group(ranks=ranks, backend='hccl') + sub_group = dist.new_group(ranks = ranks, backend='hccl') end_event = create_npu_event(self.stream) - transfer_time = event_elaspe_second(self.stream, start_event, end_event) + transfer_time = event_elaspe_second(self.stream,start_event, end_event) - self.logger.info( - '[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) + self.logger.info('[Rank %d] after new group, ranks: %s, file_sizes_hash %s' % (self.rank, ranks, file_sizes_hash)) cur_file_sizes = [file_sizes_hash[r].cpu().tolist()[0] for r in ranks[1:]] cur_file_hashes = [file_sizes_hash[r].cpu().tolist()[1:] for r in ranks[1:]] - GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, - file_hashes=cur_file_hashes) + GroupManager().add_rank_sub_group(sub_group=sub_group, ranks=ranks, file_sizes=cur_file_sizes, file_hashes=cur_file_hashes) else: self.logger.debug('[Rank %d] ranks %s not enough for creating subgroup' % (self.rank, ranks)) self.time_stat.init_pg_stat.sub_group_init = [wait_time, transfer_time] def bd_split_file_size(self, sub_group, split_size=None): - split_size_bd = torch.tensor([split_size], dtype=torch.int64, - device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, - dtype=torch.int64, - device=self.device) + split_size_bd = torch.tensor([split_size], dtype=torch.int64, device=self.device) if self.rank == sub_group.master_rank else torch.zeros(1, dtype=torch.int64, device=self.device) wait_event = create_npu_event(self.stream) dist.barrier(group=sub_group.group) start_event = create_npu_event(self.stream) @@ -116,31 +104,27 @@ class Collector: def gather_file_split(self, sub_group, tensor, output_file_dir=None): for i in range(sub_group.max_splits): - # is master node + #is master node if self.rank < self.master_rank_num: cur_tensor = torch.zeros(sub_group.split_file_size, dtype=torch.uint8, device=self.device) else: start_time = time.perf_counter() - cur_tensor = tensor[i * sub_group.split_file_size: (i + 1) * sub_group.split_file_size] + cur_tensor = tensor[i*sub_group.split_file_size: (i+1)*sub_group.split_file_size] if len(cur_tensor) < sub_group.split_file_size: - cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size - len(cur_tensor)), 'constant', - constant_values=0) + cur_tensor = np.pad(cur_tensor, (0, sub_group.split_file_size-len(cur_tensor)), 'constant', constant_values=0) cur_tensor = torch.tensor(cur_tensor, dtype=torch.uint8, device=self.device) end_time = time.perf_counter() - self.time_stat.disk_stat.read_input_file_splits.append(end_time - start_time) - - # gather rank data内部有barrier与计时 - file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, - group=sub_group.group, - gather_tensor=cur_tensor) - self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % ( - self.rank, i, wait_time, transfer_time)) + self.time_stat.disk_stat.read_input_file_splits.append(end_time-start_time) + + #gather rank data内部有barrier与计时 + file_tensor_list, wait_time, transfer_time = self.gather_rank_data(dst_rank=sub_group.master_rank, group=sub_group.group, gather_tensor=cur_tensor) + self.logger.debug("[Rank %d] gather file split %d, wait time: %f, gather time: %f seconds" % (self.rank, i, wait_time, transfer_time)) self.time_stat.com_stat.gather_file_splits.append([wait_time, transfer_time]) - # TODO: 需要记录从HBM刷到硬盘中的耗时 + #记录从memory_on_chip刷到硬盘中的耗时 if file_tensor_list: master_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) - hbm_ram_times = [] + memory_on_chip_ram_times = [] ram_disk_times = [] for rank_i, rank in enumerate(sub_group.ranks): if rank != sub_group.master_rank: @@ -149,7 +133,7 @@ class Collector: if not os.path.exists(rank_dir): os.makedirs(rank_dir, exist_ok=True) rank_file = os.path.join(rank_dir, 'split_%d' % i) - cur_split_size = sub_group.splits[rank_i - 1][i] + cur_split_size = sub_group.splits[rank_i -1][i] if cur_split_size > 0: start_time = time.perf_counter() data = file_tensor_list[rank_i][:cur_split_size].cpu().numpy().tobytes() @@ -157,29 +141,29 @@ class Collector: with open(rank_file, 'wb') as f: f.write(data) end_time = time.perf_counter() - hbm_ram_times.append(ram_time - start_time) - ram_disk_times.append(end_time - ram_time) - - self.time_stat.disk_stat.hbm_ram.append(hbm_ram_times) - self.time_stat.disk_stat.ram_disk.append(ram_disk_times) + memory_on_chip_ram_times.append(ram_time-start_time) + ram_disk_times.append(end_time-ram_time) + self.time_stat.disk_stat.memory_on_chip.append(memory_on_chip_ram_times) + self.time_stat.disk_stat.ram_disk.append(ram_disk_times) + for tensor in file_tensor_list: del tensor del file_tensor_list torch.npu.empty_cache() - + + def concat_file_split(self, output_file_dir): cur_rank_collect_dir = get_master_rank_collect_dir(output_file_dir, self.rank) concat_times = [] verify_hash_times = [] for rank_i, rank in enumerate(self.sub_group.ranks): - # 只提取slave rank的case + #只提取slave rank的case if rank == self.rank: continue group_rank = rank - self.master_rank_num rank_dir = get_slave_rank_collect_dir(cur_rank_collect_dir, group_rank) output_file_name = os.path.join(rank_dir, 'merge.zip') - # merge_file_size = 0 file_split_names = [] start_time = time.perf_counter() with open(output_file_name, 'wb') as output_file: @@ -192,29 +176,27 @@ class Collector: cat_files(output_file_name, input_files=file_split_names) for file_split in file_split_names: os.remove(file_split) - + end_time = time.perf_counter() - concat_times.append(end_time - start_time) - self.logger.debug( - '[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time - start_time)) + concat_times.append(end_time-start_time) + self.logger.debug('[Rank %d] concate slave rank %s, time: %f seconds' % (self.rank, rank, end_time-start_time)) start_time = time.perf_counter() output_file_hash = get_quick_hash(output_file_name) - self.logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank, rank_i, self.sub_group.file_hashes)) - if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i - 1]): - self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % ( - self.rank, output_file_hash, self.sub_group.file_hashes[rank_i - 1])) + self.logger.debug('[Rank %d] rank_i %d, file_hashs:%s' % (self.rank,rank_i, self.sub_group.file_hashes)) + if not is_equal_file_hash(output_file_hash, self.sub_group.file_hashes[rank_i-1]): + self.logger.error('[Rank %d] Not equal merge file hash. %s. %s' % (self.rank, output_file_hash, self.sub_group.file_hashes[rank_i-1])) end_time = time.perf_counter() - verify_hash_times.append(end_time - start_time) - - self.time_stat.disk_stat.hash_output_file = verify_hash_times + verify_hash_times.append(end_time-start_time) + + self.time_stat.disk_stat.hash_output_file = verify_hash_times self.time_stat.disk_stat.concat_file = concat_times - def master_node_run(self, local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, - split_file_size=None): + + def master_node_run(self,local_rank, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size=None): try: - # 设置环境变量,这些会在torch.dist中用到 - # 因为master node rank为0, 所以global rank直接等于local rank + #设置环境变量,这些会在torch.dist中用到 + #因为master node rank为0, 所以global rank直接等于local rank os.environ["RANK"] = str(local_rank) os.environ["LOCAL_RANK"] = str(local_rank) os.environ["WORLD_SIZE"] = str(world_size) @@ -236,45 +218,40 @@ class Collector: self.logger.info("[Rank %d] master node run" % (self.rank)) # Step 2. Gather tensor size from slave node. - gather_tensor = torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=torch.int64, device=self.device) - # (file_size, file_hash) - file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, - gather_tensor=gather_tensor, - all_gather=True) + gather_tensor = torch.tensor([0,0,0,0,0,0,0,0,0], dtype=torch.int64, device=self.device) + # 分为 (file_size, file_hash) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] - + self.logger.debug('[Rank %d] gather file size time %f seconds' % (self.rank, transfer_time)) - - # 判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 + + #判断硬盘空间是否足够,解压的过程中需要额外的空间存储临时文件与原压缩包 file_sizes = [item[0] for item in file_sizes_hash[self.master_rank_num:]] total_file_size = sum(file_sizes) - total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024 * 1024 * 1024) + total_size_gb = Constant.UNZIP_DISK_SIZE_RAIO * total_file_size / (1024*1024*1024) - self.logger.debug( - '[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) + self.logger.debug('[Rank %d] collect file sizes %s, total size %fgb' % (self.rank, file_sizes, total_file_size)) + disk_free, expected_free_space_gb = DiskManager.get_disk_space(output_file_dir, total_size_gb) # Step 3. broadcast子通信域配置,建立子通信域 self.logger.info("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) self.create_sub_group(file_sizes_hash) self.sub_group = GroupManager().get_rank_sub_group(self.rank) - # 以下进入每个子通信域特定的逻辑 + #以下进入每个子通信域特定的逻辑 if self.sub_group: - self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % ( - self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) - - # 未指定split file size的话,根据HBM/rank_num计算 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + + #未指定split file size的话,根据memory_on_chip/rank_num计算 if not split_file_size: - split_file_size = math.floor(Constant.MASTER_RANK_HBM / (len(self.sub_group.ranks))) - + if len(self.sub_group.ranks) > 0: + split_file_size = math.floor(Constant.MASTER_RANK_MEMORY_ON_CHIP / (len(self.sub_group.ranks))) + else: + logger.error("Value of self.sub_group.ranks can not below 0") self.bd_split_file_size(self.sub_group, split_file_size) - # bd_split_file_size_event = create_npu_event(stream) - # logging.debug('[Rank %d] bd split file size %d, time %f seconds' % (self.rank, split_file_size, event_elaspe_second(stream, create_sub_group_event, bd_split_file_size_event))) - self.sub_group.split_size(split_file_size) - self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % ( - self.rank, self.sub_group.split_file_size, self.sub_group.splits)) + self.logger.info("[Rank %d] Subgroup split file size %s, splits %s" % (self.rank, self.sub_group.split_file_size, self.sub_group.splits)) self.gather_file_split(sub_group=self.sub_group, tensor=None, output_file_dir=output_file_dir) self.logger.debug("[Rank %d] start concat file split" % self.rank) self.concat_file_split(output_file_dir=output_file_dir) @@ -288,11 +265,12 @@ class Collector: raise e finally: dist.destroy_process_group() + def slave_node_run(self, input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank): try: self.logger.debug('Enter slave node run wrapper') - # 设置环境变量,这些会在torch.dist中用到 + #设置环境变量,这些会在torch.dist中用到 os.environ["RANK"] = str(rank) os.environ["LOCAL_RANK"] = "0" os.environ["WORLD_SIZE"] = str(world_size) @@ -311,49 +289,42 @@ class Collector: self.logger.debug('[Rank %d] init process group time %f seconds' % (self.rank, elp_time)) self.logger.info('[Rank %d] Start slave node process' % self.rank) - + # Step2. 先压缩文件,统计文件大小,再进入到gather逻辑里 if os.path.isfile(input_file_dir): file_path = input_file_dir else: - # TODO: 在这里检查是否有目录写权限 + PathManager.check_path_writeable(input_file_dir) file_path = os.path.join(str(Path(input_file_dir).parent), 'compress.tar') start_time = time.perf_counter() compress_directory(input_file_dir, file_path) end_time = time.perf_counter() self.time_stat.disk_stat.compress_input_file = end_time - start_time self.logger.info("[Rank %d] Compress directory time: %f seconds" % (self.rank, end_time - start_time)) - - # TODO: 计算file hash的速度可能需要优化。将读文件与计算file hash的时间抽出来 file_size = os.path.getsize(file_path) start_time = time.perf_counter() file_hash_chunks = get_quick_hash(file_path) end_time = time.perf_counter() self.time_stat.disk_stat.hash_input_file = end_time - start_time self.logger.info("[Rank %d] Hash input file time: %f seconds" % (self.rank, end_time - start_time)) - file_hash_chunks.insert(0, file_size) self.logger.info( "[Rank %d] File hash chunks (first element is file size): %s" % (self.rank, file_hash_chunks)) gather_tensor = torch.tensor(file_hash_chunks, dtype=torch.int64, device=self.device) - file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, - gather_tensor=gather_tensor, - all_gather=True) + file_sizes_hash, wait_time, transfer_time = self.gather_rank_data(group=dist.group.WORLD, gather_tensor=gather_tensor, all_gather=True) self.time_stat.com_stat.gather_file_size = [wait_time, transfer_time] self.logger.info("[Rank %d] Gather file size - wait time: %f seconds, transfer time: %f seconds" % ( self.rank, wait_time, transfer_time)) - - # Step3. 建立子通信域 + #Step3. 建立子通信域 self.logger.debug("[Rank %d] creating sub group %s" % (self.rank, file_sizes_hash)) self.create_sub_group(file_sizes_hash) self.sub_group = GroupManager().get_rank_sub_group(self.rank) - # 进入每个子通信域特定的逻辑 + #进入每个子通信域特定的逻辑 if self.sub_group: - # Step4. broacast split size大小 - self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % ( - self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) + #Step4. broacast split size大小 + self.logger.info("[Rank %d] Subgroup ranks %s, file_sizes %s" % (self.rank, self.sub_group.ranks, self.sub_group.file_sizes)) split_file_size = self.bd_split_file_size(self.sub_group) self.sub_group.split_size(split_file_size) file_tensor = np.memmap(file_path, dtype=np.uint8, mode='r') @@ -362,14 +333,18 @@ class Collector: else: self.logger.warning("[Rank %d] slave rank not in sub group" % (self.rank)) dist.barrier() - except Exception as e: + except Exception as e: self.logger.error("[ERROR] %s", e, exc_info=True) raise e finally: dist.destroy_process_group() - - def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, - split_file_size, time_out, log_file): + + def run(self, input_file_dir, output_file_dir, nnodes, node_rank, master_addr, master_port, master_rank_num, split_file_size, time_out, log_file): + logging.basicConfig( + filename=log_file, # File to write logs to + level=logging.DEBUG, # Minimum logging level to write to the file + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Log message format + ) self.logger.info({"message": "Run method arguments", "class": self.__class__.__name__, "method": sys._getframe().f_code.co_name, @@ -386,15 +361,14 @@ class Collector: "log_file": log_file }}) - try: - # calculate world size + try: + # 计算calculate world size world_size = nnodes + master_rank_num - 1 - # master node的逻辑 + #master node的逻辑 if node_rank == 0: processes = [] for i in range(master_rank_num): - process = mp.Process(target=self.master_node_run, args=( - i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) + process = mp.Process(target=self.master_node_run, args=(i, output_file_dir, world_size, master_addr, master_port, master_rank_num, split_file_size)) self.logger.info("Start master node subprocess %d." % i) process.start() processes.append(process) @@ -405,7 +379,6 @@ class Collector: if all_done: self.logger.info("All subprocesses finished successfully.") break - # TODO: 检查当前output_file_dir路径是否有写权限 elapsed_time = time.perf_counter() - start_time time.sleep(5) if elapsed_time > time_out: @@ -418,13 +391,13 @@ class Collector: process.terminate() process.join() finally: + # 确保Ensure all processes are cleaned up for process in processes: process.join() - # slave node的逻辑 + #slave node的逻辑 else: - rank = node_rank + master_rank_num - 1 - self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, - node_rank) + rank = node_rank + master_rank_num -1 + self.slave_node_run(input_file_dir, world_size, master_addr, master_port, master_rank_num, rank, node_rank) except Exception as e: self.logger.error("[ERROR] %s", e, exc_info=True) raise e @@ -440,7 +413,6 @@ if __name__ == "__main__": parser.add_argument("--master_port", type=int, default=29501, help='master port') parser.add_argument("--master_rank_num", type=int, default=8, help='master rank nums') - # TODO: 有两个可能的切分mode。第一种(也就是目前实现的)是每个master rank同时传所有的子通信域的数据,根据HBM/slave_node_num 来确定split file size。第二种是指定好每次split file size, 然后根据HBM/split file size来指定slave node num, 通过排队的方式,依次建立子通信域。两种方式的效率需要在实际集群场景下进行测试。 parser.add_argument("--split_file_size", type=int, default=None, help='split file size') # master node整体time out的时间 @@ -460,7 +432,7 @@ if __name__ == "__main__": logger.debug(vars(args)) args_dict = vars(args) - try: + try: collector.run(**args_dict) except Exception as e: logger.error("[ERROR] %s", e, exc_info=True) -- Gitee From ed3fcff9834d8e653cfc7b5e80eef6d2ba7969a3 Mon Sep 17 00:00:00 2001 From: aosudh <2169159625@qq.com> Date: Wed, 4 Dec 2024 17:41:39 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9cleancode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- profiler/precheck/collect/collector.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/profiler/precheck/collect/collector.py b/profiler/precheck/collect/collector.py index e28f5ecbc21..632ce31cdb3 100644 --- a/profiler/precheck/collect/collector.py +++ b/profiler/precheck/collect/collector.py @@ -1,5 +1,5 @@ import sys -sys.path.append("../../..") +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import os import logging @@ -17,9 +17,11 @@ import torch.multiprocessing as mp from profiler.prof_common.path_manager import PathManager from profiler.precheck.manager.group_manager import GroupManager -from profiler.precheck.common.utils import * -from profiler.precheck.common.time_stat import * -from profiler.precheck.common.constant import * +from profiler.precheck.common.constant import Constant +from profiler.precheck.common.time_stat import TimeStat +from profiler.precheck.common.utils import create_npu_event, event_elaspe_second, parition_sub_group_ranks, \ + get_master_rank_collect_dir, get_slave_rank_collect_dir, cat_files, is_equal_file_hash, get_quick_hash, \ + compress_directory from profiler.precheck.common.diskmanager import DiskManager -- Gitee