From 7e019393dc72b31cb4848c6620aacfab25fd1f66 Mon Sep 17 00:00:00 2001 From: zbl Date: Tue, 11 Mar 2025 22:45:08 +0800 Subject: [PATCH] communication && IO check --- .DS_Store | Bin 0 -> 6148 bytes profiler/.DS_Store | Bin 0 -> 8196 bytes profiler/msprof_analyze/.DS_Store | Bin 0 -> 8196 bytes profiler/msprof_analyze/precheck/.DS_Store | Bin 0 -> 8196 bytes .../precheck/env_check/analyze.py | 35 +++ .../precheck/env_check/communication_check.py | 227 +++++++++++++++++- .../precheck/env_check/io_check.py | 169 ++++++++++++- 7 files changed, 426 insertions(+), 5 deletions(-) create mode 100644 .DS_Store create mode 100644 profiler/.DS_Store create mode 100644 profiler/msprof_analyze/.DS_Store create mode 100644 profiler/msprof_analyze/precheck/.DS_Store create mode 100644 profiler/msprof_analyze/precheck/env_check/analyze.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..8ca3a8fb5968dab3de119e87c47f7d3436655a36 GIT binary patch literal 6148 zcmeHK%Wl&^6ur}?jg#`o0*P*ryg_WDDs8IL4N^#xMWj+%T<8K&YBw)St{wS7R6-#8 zegQ1_1-^qX;78c7f-?`0;}kYX2t{+HnKO@b=W%8{;~^qePdb}KB_c8qIOd9ot}tHD z!<;Rtl50RF+UU2u;LPy?XO622v;taz|4#w_?XFOVy5vwo>F<{x37-%)z6%pWUjy@k zd{WV9Ctx<|30WELw+uIr-aS5U zaxs!7hk z!KfZ2ekT&`ShB(}3Pzz_S|5+M?(de(tx9!LHpjcw?XtOD-I+`Z#_hYC4_c?^7q7=} zCU4(kQZR-CTG&m6Kj8xsbHco`Lf;GgKI*t%0ZY+@oDF&e+@N#n%t|MgJT2WSMP!4= zm#3TI@&!+G@6j<0up}WiSDOyu7gCIE*2YE{VwrrTEG%1s@-yURt_RLYaCylA>SMHl z`b=+p%cOfN9(nJM=qYBjfBCg{ z))Y=9ilYOOcme=(C>Dk^|8d|xrox)SsYKMkm~;h7SEimAOuD1oRJ@wPsYK~cOg%oB z>Y1q*3KPAfeN)hh)g!fp90jP(&h))Y=9q6DUY1cVGa(hB@j1%3d+HRMMC literal 0 HcmV?d00001 diff --git a/profiler/.DS_Store b/profiler/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..207a233c75202f3540e1055e330f46def4ce2ae6 GIT binary patch literal 8196 zcmeHLPfrs;6n}##+oG}{NHLmhY)nib281XVLM<3=i~%h{5Vh{ML*2NYX?C|jK}_$S zJZjk^KkPqNP3Nlx3e9K~FbcFQAa0v(S+o1V_SNt8T)>N7 zz&(r*H`OKV=f28@7Cnh5(f6d|KUd{-j@C~F^NVYG)8tKa(;scS zx$Q)<`|y#YJ$7&3vEKgPfq~s<m=!zPZ)ah{j8M?B#;p6BR=q1oJ%d zW1q9-I}u)rTuduDw#sqEHI#m?DyE`E8b*k1oTX(q3|&^H+4|kU^=L51vOJD?MMR2h zMb73!Dl#qkXp6n1r#~xc)0t~cI-7>#zt9 z@eBJDUcgIu4R7HCe1b2eiwu#|WSC5lDKbs2k{jeESs=^gHd#^pj^>pXx7nUXN_gXAG>HGuAxv;aFFFVqWmF%sLOE~>tLwhWHdGuq6eG#BS6Yv2BW|q GRp1-T<0~it literal 0 HcmV?d00001 diff --git a/profiler/msprof_analyze/.DS_Store b/profiler/msprof_analyze/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..56ccf3bf95d72fe3a2f9b2a624016d4f45491454 GIT binary patch literal 8196 zcmeHMO-~a+7=A}8Y+H~XNZ?|!7jJ~{73E@53pF88G%k^YnzE$}Y;1RGcT0s@(!0Mv z5B>#zhrhso;oXxy^I@|)rCv-72{UBonVo%~`FQ4?-M$MEu}ZhGMzlmk4zj>f5ycfr z+!uK)-O??KKn6ZhND0;Ow?gePYujKIunJfOtO8a6tH5~DLOGdGSd+XlcR&@N^@dmg|@T`SOvxv5V3nv#_S>tiKx8Q*=a43bR)1u3u^FvbJ*FUoz`{Lc6q0RoWwus)n^#ciOs0 zPtn637>~j5XdmB3pmdfcbx5-ounG=!yO_IJhf=vKl?dqqn4lLv){jsm-A@5FF|-h0 zWfGu|b($&)Ez+E6dPWDBag|<>mz}53g561J;XrGl^bwuP@~vSHHU4g2@5OEHW8doS zW*8@{tKZqJsod@98OJF&g}K5T@09nv#80|$#qYclSI_oHs{xzy2}|dbQHR zHjffNjDn6Nhm8OxuMfhg!Fv_njT*7+$5S^PC-3BIrKNs<{lV6RTYI>B*gC)He;8bTf?!BYzlmsGU60DosErA8=5?Ybk4^*;xhRF>aJZUw zTCw3*xJ5?3*%T|F_qf8r!^WEK4L8W}^4Q8MHg5;cScL<&sFB?$1}DfcX63fNg9C=L z*24Bi<0DCjxJ)@jX%Y{kCYW)O9hZcIr1^Pq%wgHZ+E$>XcOiy4#bbt`bSP+D^`K)uE3O<;8GD-o&5d(x-Y}d z&njRQ_y+}8ZpYiHpo2fZXw1GutnDHnB8w#Kh6*JGg$&1mG8_kv{$Yr^3sb= T3)27p5MbZ`Qg~b3Xa#-)xgeDs literal 0 HcmV?d00001 diff --git a/profiler/msprof_analyze/precheck/.DS_Store b/profiler/msprof_analyze/precheck/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..317babd10ce50122fb52f0035f1dc564a11cd03a GIT binary patch literal 8196 zcmeHM%Wl&^6ur|&;jEQV8v$3nWAkEC8i(6EzmjxWrCF8-lXu z3t+)7@Ev>s-@*#cy)#6ylM1mxRLn#(XFNV<#&dk_>9|CsYW>z0(HaqX$c*Jx6mt@v z=RA`ZY|C|^f}SX(lp6SL(8&zjH8=;H1I_{GfOEh(@Gm%kHJg)n!MbnGy3#q|9QZFC z;P-=%%viNJHYl$S6tV>XE}>Zx>evT}#QKE zBPV6#r0mGb?ogDB4xKM_QdNVlbPhNNW*y+Y`>O2OL(-Z3``JMh9|chyfWZ&d1LTb# zl2H4l>QWObJ=yi=RbO(dBKR6PqBAc};|&MIW~zflB(UK}WVXFQKEZsK-Xg@JDUXG2_a4qPG! z7R|M;@&14R;_v^LXh!Z*odeE+KXpJY?bY{cP|2!3N6o9ek9>&CiFsp#B0-V)LjcE< ce;A_fgUXt!#j!!OAh{0#VuLH31Ao 1: + group_names_list.append("dp") + group_info_dict["dp"] = [self.dcb.get_data_parallel_group_ranks(), + self.dcb.get_data_parallel_group()] + if self.tp > 1: + group_names_list.append("tp") + group_info_dict["tp"] = [self.dcb.get_tensor_parallel_group_ranks(), + self.dcb.get_tensor_parallel_group()] + if self.pp > 1: + group_names_list.append("pp") + group_info_dict["pp"] = [self.dcb.get_pipeline_parallel_group_ranks(), + self.dcb.get_pipeline_parallel_group()] + if self.ep > 1: + group_names_list.append("ep") + group_info_dict["ep"] = [self.dcb.get_expert_parallel_group_ranks(), + self.dcb.get_expert_parallel_group()] + if self.cp > 1: + group_names_list.append("cp") + group_info_dict["cp"] = [self.dcb.get_context_parallel_group_ranks(), + self.dcb.get_context_parallel_group()] + return group_names_list, group_info_dict + + def p2p_comm(self, iteration: int = 5, batch_size=(32, 4096, 8192)): + """Perform point-to-point communication between ranks in the group""" + group_names_list, group_info_dict = self.comm_group_create() + local_cost = {"rank": dist.get_rank()} + if len(group_names_list) == 0: + raise ValueError("The tensor-parallel, pipeline-parallel, data-parallel, " + "expert-parallel, context-parallel is set less than 2, " + "no peer communication can be created") + tensor_send = torch.randn(batch_size, dtype=torch.float32).npu() + tensor_recv = torch.empty(batch_size, dtype=torch.float32).npu() + for group_name in group_names_list: + ranks, proc_group = group_info_dict[group_name] + local_cost[group_name] = [] + if len(ranks) <= 1: + raise ValueError(f"Failed to start communication group {group_name}," + f"since the group is {ranks}.") + for i in range(iteration): + if dist.get_rank() == 0: + logging.info(f">>Start communication: {group_name}, iteration: {i}") + tensor_send.uniform_(-1, 1) + if i == 0: + dist.barrier() + timer = Timer() + timer.start() + self.perform_p2p_communication_async(ranks, tensor_send, tensor_recv, proc_group) + timer.stop() + local_cost[group_name].append(timer.delta) + local_cost[f"{group_name}_avg_cost"] = sum(local_cost[group_name]) / iteration + self.dump_file(local_cost, self.output) + if self.no_share_storage: + self.collect_data() + + def get_file_path(self, file_path, file_name): + """get the file path of the given file name""" + if not os.path.exists(file_path): + logging.info(f"path is not exist, creating output dir: {file_path}") + os.makedirs(file_path) + return os.path.join(file_path, file_name) + + def dump_file(self, data, save_dir): + """save data to file as worker_x_rank_y.json""" + cur_rank = dist.get_rank() + worker_id = os.getenv("GROUP_RANK") + if worker_id is None: + raise ValueError("GROUP_ID environment variable is not set.") + if data is None: + raise ValueError(f"data is not created by rank {cur_rank}, please check this") + dump_path = self.get_file_path(self.tmp_path, f"worker_{worker_id}_rank_{cur_rank}.json") + try: + with open(dump_path, 'w') as f: + json.dump(data, f, indent=4) + except Exception as e: + logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") + + def collect_data(self): + """ + 将全部节点搜集的日志汇聚到master节点,共享存储不需要该操作 + """ + worker_id = os.getenv("GROUP_RANK") + if worker_id is None: + raise ValueError("GROUP_ID environment variable is not set.") + send_file_dir = self.get_file_path(self.tmp_path, f'worker_{worker_id}_rank_{self.rank}.json') + receive_file_dir = os.path.join(self.output, 'collected_data') + if self.rank == 0: + logging.info(f"master node {self.rank} is collecting data from other nodes") + self.dcb.collect_global_info(send_file_dir, receive_file_dir, time_out=1800, log_file="./comm_test.log") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--world_size", type=int, default=16, required=True) + parser.add_argument("--output", type=str, default="./env_check.json") + parser.add_argument("--tensor_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--pipeline_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--context_model_parallel_size", type=int, default=1, required=True) + parser.add_argument("--expert_model_parallel_size", type=int, default=1, required=True) + args = parser.parse_args() - def check(self): - pass + comm_check = CommunicationCheck(args=args) + comm_check.p2p_comm() + comm_check.comm_group_destroy() \ No newline at end of file diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 35780cb63b..7ea9a4f53c 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -12,6 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os +import time +import logging +import shutil +import json +import numpy as np from msprof_analyze.precheck.env_check.environment_check import HardwareCheck @@ -20,6 +27,164 @@ class IOCheck(HardwareCheck): def __init__(self, args): super().__init__(args) + self.base_dir = './data_file' + self.global_worker_id = os.getenv("GROUP_RANK") + self.local_rank = os.getenv("LOCAL_RANK") + self.ensure_base_dir_exists() + self.output = args.output if args is not None else './results.json' + self.results = {'local_rank': self.local_rank, + 'global_rank': self.global_worker_id, + 'read_cost': { + 'ckpt_read': 0, + 'log_read': 0, + }, + 'write_cost': { + 'ckpt_write': 0, + 'log_write': 0, + }} + + def ensure_base_dir_exists(self): + """确保数据目录存在""" + if not os.path.exists(self.base_dir): + os.makedirs(self.base_dir) + + def generate_log_message(self, index): + """生成日志消息""" + return (f"this is the create txt file for IO check in cluster, you can ignore this information " + f"and check the result in another file [{time.strftime('%Y-%m-%d %H:%M:%S')}] " + f"这是第 {index + 1} 条日志信息。\n") + + def get_file_path(self, index, file_type): + """生成文件路径""" + if file_type == "txt": + return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.txt') + elif file_type == "npy": + return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.npy') + else: + raise ValueError(f"file type {file_type} is not included in the list [txt, npy]") + + def is_local_rank_zero(self): + """检查本地排名是否为 0""" + return self.local_rank is not None and int(self.local_rank) == 0 + + def calculate_speed(self, start_time, end_time, data_size): + """计算读写速度""" + elapsed_time = end_time - start_time + return data_size / elapsed_time / (1024 ** 2) + + def generate_random_weights(self, shape=(4, 2048, 4096)): + """生成随机权重""" + return np.random.randn(*shape).astype(np.float32) + + def generate_file_like_log(self, iteration: int = 10000, scaler: int = 100): + """生成数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='txt') + start_time = time.time() + total_data_size = 0 + + with open(file_path, 'w', encoding='utf-8') as file: + for i in range(iteration): + log_message = self.generate_log_message(i) * scaler + file.write(log_message) + file.flush() + total_data_size += len(log_message) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results['write_cost']['log_write'] = write_speed + return file_path + + def read_file_like_log(self, file_path=None): + """读取单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='txt') + try: + start_time = time.time() + data = "" + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + data += line + end_time = time.time() + data_size = len(data.encode('utf-8')) + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results['read_cost']['log_read'] = read_speed + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def generate_file_like_ckpt(self): + """生成大数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='npy') + weight = self.generate_random_weights() + total_data_size = weight.size * weight.itemsize + start_time = time.time() + np.save(file_path, weight) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results['write_cost']['ckpt_write'] = write_speed + return file_path + + def read_file_like_ckpt(self, file_path=None): + """读取大的单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='npy') + try: + start_time = time.time() + data = np.load(file_path) + end_time = time.time() + data_size = data.nbytes + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results['read_cost']['ckpt_read'] = read_speed + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def clean_cache_file(self, file_type=None): + """执行完删除缓存文件,避免磁盘空间不足""" + if file_type == 'txt': + file_path = self.get_file_path(self.local_rank, file_type='txt') + elif file_type == 'npy': + file_path = self.get_file_path(self.local_rank, file_type='npy') + else: + raise ValueError(f"no such file type {file_type} could be clean as cache file loaded temperaly") + try: + os.remove(file_path) + except FileNotFoundError: + logging.error(f"File {file_path} not found, cannot remove.") + except PermissionError: + logging.error(f"Permission denied when trying to remove {file_path}.") + except Exception as e: + logging.error(f"Unexpected error removing {file_path}: {e}") + + def dump_results(self): + """生成临时结果用于分析""" + try: + with open(self.output, 'w') as f: + json.dump(self.results, f, indent=4) + logging.info(f"Results have been saved to {self.output}") + except Exception as e: + logging.error(f"Error saving results to {self.output}: {e}") + + + + - def check(self): - pass +if __name__ == "__main__": + io = IOCheck() + io.generate_file_like_log() + io.read_file_like_log() + io.generate_file_like_ckpt() + io.read_file_like_ckpt() + io.clean_cache_file('txt') + io.clean_cache_file('npy') \ No newline at end of file -- Gitee