From 7e019393dc72b31cb4848c6620aacfab25fd1f66 Mon Sep 17 00:00:00 2001 From: zbl Date: Tue, 11 Mar 2025 22:45:08 +0800 Subject: [PATCH 1/6] communication && IO check --- .DS_Store | Bin 0 -> 6148 bytes profiler/.DS_Store | Bin 0 -> 8196 bytes profiler/msprof_analyze/.DS_Store | Bin 0 -> 8196 bytes profiler/msprof_analyze/precheck/.DS_Store | Bin 0 -> 8196 bytes .../precheck/env_check/analyze.py | 35 +++ .../precheck/env_check/communication_check.py | 227 +++++++++++++++++- .../precheck/env_check/io_check.py | 169 ++++++++++++- 7 files changed, 426 insertions(+), 5 deletions(-) create mode 100644 .DS_Store create mode 100644 profiler/.DS_Store create mode 100644 profiler/msprof_analyze/.DS_Store create mode 100644 profiler/msprof_analyze/precheck/.DS_Store create mode 100644 profiler/msprof_analyze/precheck/env_check/analyze.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..8ca3a8fb5968dab3de119e87c47f7d3436655a36 GIT binary patch literal 6148 zcmeHK%Wl&^6ur}?jg#`o0*P*ryg_WDDs8IL4N^#xMWj+%T<8K&YBw)St{wS7R6-#8 zegQ1_1-^qX;78c7f-?`0;}kYX2t{+HnKO@b=W%8{;~^qePdb}KB_c8qIOd9ot}tHD z!<;Rtl50RF+UU2u;LPy?XO622v;taz|4#w_?XFOVy5vwo>F<{x37-%)z6%pWUjy@k zd{WV9Ctx<|30WELw+uIr-aS5U zaxs!7hk z!KfZ2ekT&`ShB(}3Pzz_S|5+M?(de(tx9!LHpjcw?XtOD-I+`Z#_hYC4_c?^7q7=} zCU4(kQZR-CTG&m6Kj8xsbHco`Lf;GgKI*t%0ZY+@oDF&e+@N#n%t|MgJT2WSMP!4= zm#3TI@&!+G@6j<0up}WiSDOyu7gCIE*2YE{VwrrTEG%1s@-yURt_RLYaCylA>SMHl z`b=+p%cOfN9(nJM=qYBjfBCg{ z))Y=9ilYOOcme=(C>Dk^|8d|xrox)SsYKMkm~;h7SEimAOuD1oRJ@wPsYK~cOg%oB z>Y1q*3KPAfeN)hh)g!fp90jP(&h))Y=9q6DUY1cVGa(hB@j1%3d+HRMMC literal 0 HcmV?d00001 diff --git a/profiler/.DS_Store b/profiler/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..207a233c75202f3540e1055e330f46def4ce2ae6 GIT binary patch literal 8196 zcmeHLPfrs;6n}##+oG}{NHLmhY)nib281XVLM<3=i~%h{5Vh{ML*2NYX?C|jK}_$S zJZjk^KkPqNP3Nlx3e9K~FbcFQAa0v(S+o1V_SNt8T)>N7 zz&(r*H`OKV=f28@7Cnh5(f6d|KUd{-j@C~F^NVYG)8tKa(;scS zx$Q)<`|y#YJ$7&3vEKgPfq~s<m=!zPZ)ah{j8M?B#;p6BR=q1oJ%d zW1q9-I}u)rTuduDw#sqEHI#m?DyE`E8b*k1oTX(q3|&^H+4|kU^=L51vOJD?MMR2h zMb73!Dl#qkXp6n1r#~xc)0t~cI-7>#zt9 z@eBJDUcgIu4R7HCe1b2eiwu#|WSC5lDKbs2k{jeESs=^gHd#^pj^>pXx7nUXN_gXAG>HGuAxv;aFFFVqWmF%sLOE~>tLwhWHdGuq6eG#BS6Yv2BW|q GRp1-T<0~it literal 0 HcmV?d00001 diff --git a/profiler/msprof_analyze/.DS_Store b/profiler/msprof_analyze/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..56ccf3bf95d72fe3a2f9b2a624016d4f45491454 GIT binary patch literal 8196 zcmeHMO-~a+7=A}8Y+H~XNZ?|!7jJ~{73E@53pF88G%k^YnzE$}Y;1RGcT0s@(!0Mv z5B>#zhrhso;oXxy^I@|)rCv-72{UBonVo%~`FQ4?-M$MEu}ZhGMzlmk4zj>f5ycfr z+!uK)-O??KKn6ZhND0;Ow?gePYujKIunJfOtO8a6tH5~DLOGdGSd+XlcR&@N^@dmg|@T`SOvxv5V3nv#_S>tiKx8Q*=a43bR)1u3u^FvbJ*FUoz`{Lc6q0RoWwus)n^#ciOs0 zPtn637>~j5XdmB3pmdfcbx5-ounG=!yO_IJhf=vKl?dqqn4lLv){jsm-A@5FF|-h0 zWfGu|b($&)Ez+E6dPWDBag|<>mz}53g561J;XrGl^bwuP@~vSHHU4g2@5OEHW8doS zW*8@{tKZqJsod@98OJF&g}K5T@09nv#80|$#qYclSI_oHs{xzy2}|dbQHR zHjffNjDn6Nhm8OxuMfhg!Fv_njT*7+$5S^PC-3BIrKNs<{lV6RTYI>B*gC)He;8bTf?!BYzlmsGU60DosErA8=5?Ybk4^*;xhRF>aJZUw zTCw3*xJ5?3*%T|F_qf8r!^WEK4L8W}^4Q8MHg5;cScL<&sFB?$1}DfcX63fNg9C=L z*24Bi<0DCjxJ)@jX%Y{kCYW)O9hZcIr1^Pq%wgHZ+E$>XcOiy4#bbt`bSP+D^`K)uE3O<;8GD-o&5d(x-Y}d z&njRQ_y+}8ZpYiHpo2fZXw1GutnDHnB8w#Kh6*JGg$&1mG8_kv{$Yr^3sb= T3)27p5MbZ`Qg~b3Xa#-)xgeDs literal 0 HcmV?d00001 diff --git a/profiler/msprof_analyze/precheck/.DS_Store b/profiler/msprof_analyze/precheck/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..317babd10ce50122fb52f0035f1dc564a11cd03a GIT binary patch literal 8196 zcmeHM%Wl&^6ur|&;jEQV8v$3nWAkEC8i(6EzmjxWrCF8-lXu z3t+)7@Ev>s-@*#cy)#6ylM1mxRLn#(XFNV<#&dk_>9|CsYW>z0(HaqX$c*Jx6mt@v z=RA`ZY|C|^f}SX(lp6SL(8&zjH8=;H1I_{GfOEh(@Gm%kHJg)n!MbnGy3#q|9QZFC z;P-=%%viNJHYl$S6tV>XE}>Zx>evT}#QKE zBPV6#r0mGb?ogDB4xKM_QdNVlbPhNNW*y+Y`>O2OL(-Z3``JMh9|chyfWZ&d1LTb# zl2H4l>QWObJ=yi=RbO(dBKR6PqBAc};|&MIW~zflB(UK}WVXFQKEZsK-Xg@JDUXG2_a4qPG! z7R|M;@&14R;_v^LXh!Z*odeE+KXpJY?bY{cP|2!3N6o9ek9>&CiFsp#B0-V)LjcE< ce;A_fgUXt!#j!!OAh{0#VuLH31Ao 1: + group_names_list.append("dp") + group_info_dict["dp"] = [self.dcb.get_data_parallel_group_ranks(), + self.dcb.get_data_parallel_group()] + if self.tp > 1: + group_names_list.append("tp") + group_info_dict["tp"] = [self.dcb.get_tensor_parallel_group_ranks(), + self.dcb.get_tensor_parallel_group()] + if self.pp > 1: + group_names_list.append("pp") + group_info_dict["pp"] = [self.dcb.get_pipeline_parallel_group_ranks(), + self.dcb.get_pipeline_parallel_group()] + if self.ep > 1: + group_names_list.append("ep") + group_info_dict["ep"] = [self.dcb.get_expert_parallel_group_ranks(), + self.dcb.get_expert_parallel_group()] + if self.cp > 1: + group_names_list.append("cp") + group_info_dict["cp"] = [self.dcb.get_context_parallel_group_ranks(), + self.dcb.get_context_parallel_group()] + return group_names_list, group_info_dict + + def p2p_comm(self, iteration: int = 5, batch_size=(32, 4096, 8192)): + """Perform point-to-point communication between ranks in the group""" + group_names_list, group_info_dict = self.comm_group_create() + local_cost = {"rank": dist.get_rank()} + if len(group_names_list) == 0: + raise ValueError("The tensor-parallel, pipeline-parallel, data-parallel, " + "expert-parallel, context-parallel is set less than 2, " + "no peer communication can be created") + tensor_send = torch.randn(batch_size, dtype=torch.float32).npu() + tensor_recv = torch.empty(batch_size, dtype=torch.float32).npu() + for group_name in group_names_list: + ranks, proc_group = group_info_dict[group_name] + local_cost[group_name] = [] + if len(ranks) <= 1: + raise ValueError(f"Failed to start communication group {group_name}," + f"since the group is {ranks}.") + for i in range(iteration): + if dist.get_rank() == 0: + logging.info(f">>Start communication: {group_name}, iteration: {i}") + tensor_send.uniform_(-1, 1) + if i == 0: + dist.barrier() + timer = Timer() + timer.start() + self.perform_p2p_communication_async(ranks, tensor_send, tensor_recv, proc_group) + timer.stop() + local_cost[group_name].append(timer.delta) + local_cost[f"{group_name}_avg_cost"] = sum(local_cost[group_name]) / iteration + self.dump_file(local_cost, self.output) + if self.no_share_storage: + self.collect_data() + + def get_file_path(self, file_path, file_name): + """get the file path of the given file name""" + if not os.path.exists(file_path): + logging.info(f"path is not exist, creating output dir: {file_path}") + os.makedirs(file_path) + return os.path.join(file_path, file_name) + + def dump_file(self, data, save_dir): + """save data to file as worker_x_rank_y.json""" + cur_rank = dist.get_rank() + worker_id = os.getenv("GROUP_RANK") + if worker_id is None: + raise ValueError("GROUP_ID environment variable is not set.") + if data is None: + raise ValueError(f"data is not created by rank {cur_rank}, please check this") + dump_path = self.get_file_path(self.tmp_path, f"worker_{worker_id}_rank_{cur_rank}.json") + try: + with open(dump_path, 'w') as f: + json.dump(data, f, indent=4) + except Exception as e: + logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") + + def collect_data(self): + """ + 将全部节点搜集的日志汇聚到master节点,共享存储不需要该操作 + """ + worker_id = os.getenv("GROUP_RANK") + if worker_id is None: + raise ValueError("GROUP_ID environment variable is not set.") + send_file_dir = self.get_file_path(self.tmp_path, f'worker_{worker_id}_rank_{self.rank}.json') + receive_file_dir = os.path.join(self.output, 'collected_data') + if self.rank == 0: + logging.info(f"master node {self.rank} is collecting data from other nodes") + self.dcb.collect_global_info(send_file_dir, receive_file_dir, time_out=1800, log_file="./comm_test.log") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--world_size", type=int, default=16, required=True) + parser.add_argument("--output", type=str, default="./env_check.json") + parser.add_argument("--tensor_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--pipeline_model_parallel_size", type=int, default=4, required=True) + parser.add_argument("--context_model_parallel_size", type=int, default=1, required=True) + parser.add_argument("--expert_model_parallel_size", type=int, default=1, required=True) + args = parser.parse_args() - def check(self): - pass + comm_check = CommunicationCheck(args=args) + comm_check.p2p_comm() + comm_check.comm_group_destroy() \ No newline at end of file diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 35780cb63b..7ea9a4f53c 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -12,6 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os +import time +import logging +import shutil +import json +import numpy as np from msprof_analyze.precheck.env_check.environment_check import HardwareCheck @@ -20,6 +27,164 @@ class IOCheck(HardwareCheck): def __init__(self, args): super().__init__(args) + self.base_dir = './data_file' + self.global_worker_id = os.getenv("GROUP_RANK") + self.local_rank = os.getenv("LOCAL_RANK") + self.ensure_base_dir_exists() + self.output = args.output if args is not None else './results.json' + self.results = {'local_rank': self.local_rank, + 'global_rank': self.global_worker_id, + 'read_cost': { + 'ckpt_read': 0, + 'log_read': 0, + }, + 'write_cost': { + 'ckpt_write': 0, + 'log_write': 0, + }} + + def ensure_base_dir_exists(self): + """确保数据目录存在""" + if not os.path.exists(self.base_dir): + os.makedirs(self.base_dir) + + def generate_log_message(self, index): + """生成日志消息""" + return (f"this is the create txt file for IO check in cluster, you can ignore this information " + f"and check the result in another file [{time.strftime('%Y-%m-%d %H:%M:%S')}] " + f"这是第 {index + 1} 条日志信息。\n") + + def get_file_path(self, index, file_type): + """生成文件路径""" + if file_type == "txt": + return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.txt') + elif file_type == "npy": + return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.npy') + else: + raise ValueError(f"file type {file_type} is not included in the list [txt, npy]") + + def is_local_rank_zero(self): + """检查本地排名是否为 0""" + return self.local_rank is not None and int(self.local_rank) == 0 + + def calculate_speed(self, start_time, end_time, data_size): + """计算读写速度""" + elapsed_time = end_time - start_time + return data_size / elapsed_time / (1024 ** 2) + + def generate_random_weights(self, shape=(4, 2048, 4096)): + """生成随机权重""" + return np.random.randn(*shape).astype(np.float32) + + def generate_file_like_log(self, iteration: int = 10000, scaler: int = 100): + """生成数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='txt') + start_time = time.time() + total_data_size = 0 + + with open(file_path, 'w', encoding='utf-8') as file: + for i in range(iteration): + log_message = self.generate_log_message(i) * scaler + file.write(log_message) + file.flush() + total_data_size += len(log_message) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results['write_cost']['log_write'] = write_speed + return file_path + + def read_file_like_log(self, file_path=None): + """读取单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='txt') + try: + start_time = time.time() + data = "" + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + data += line + end_time = time.time() + data_size = len(data.encode('utf-8')) + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results['read_cost']['log_read'] = read_speed + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def generate_file_like_ckpt(self): + """生成大数据文件并计算写入速度""" + index = self.local_rank + file_path = self.get_file_path(index, file_type='npy') + weight = self.generate_random_weights() + total_data_size = weight.size * weight.itemsize + start_time = time.time() + np.save(file_path, weight) + end_time = time.time() + write_speed = self.calculate_speed(start_time, end_time, total_data_size) + self.results['write_cost']['ckpt_write'] = write_speed + return file_path + + def read_file_like_ckpt(self, file_path=None): + """读取大的单个文件并计算读取速度""" + if file_path is None: + file_path = self.get_file_path(self.local_rank, file_type='npy') + try: + start_time = time.time() + data = np.load(file_path) + end_time = time.time() + data_size = data.nbytes + read_speed = self.calculate_speed(start_time, end_time, data_size) + self.results['read_cost']['ckpt_read'] = read_speed + return data + except FileNotFoundError: + logging.error(f"File {file_path} not found.") + except UnicodeDecodeError: + logging.error(f"Error decoding {file_path} as UTF-8.") + except Exception as e: + logging.error(f"Unexpected error reading {file_path}: {e}") + return None + + def clean_cache_file(self, file_type=None): + """执行完删除缓存文件,避免磁盘空间不足""" + if file_type == 'txt': + file_path = self.get_file_path(self.local_rank, file_type='txt') + elif file_type == 'npy': + file_path = self.get_file_path(self.local_rank, file_type='npy') + else: + raise ValueError(f"no such file type {file_type} could be clean as cache file loaded temperaly") + try: + os.remove(file_path) + except FileNotFoundError: + logging.error(f"File {file_path} not found, cannot remove.") + except PermissionError: + logging.error(f"Permission denied when trying to remove {file_path}.") + except Exception as e: + logging.error(f"Unexpected error removing {file_path}: {e}") + + def dump_results(self): + """生成临时结果用于分析""" + try: + with open(self.output, 'w') as f: + json.dump(self.results, f, indent=4) + logging.info(f"Results have been saved to {self.output}") + except Exception as e: + logging.error(f"Error saving results to {self.output}: {e}") + + + + - def check(self): - pass +if __name__ == "__main__": + io = IOCheck() + io.generate_file_like_log() + io.read_file_like_log() + io.generate_file_like_ckpt() + io.read_file_like_ckpt() + io.clean_cache_file('txt') + io.clean_cache_file('npy') \ No newline at end of file -- Gitee From 5944343f14d2f62df35e1ed04cf8fd3df9e5425b Mon Sep 17 00:00:00 2001 From: zbl Date: Wed, 12 Mar 2025 15:29:14 +0800 Subject: [PATCH 2/6] add analyze --- .../precheck/env_check/communication_check.py | 40 ++++++++++++++----- .../precheck/env_check/io_check.py | 23 ++++++----- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/profiler/msprof_analyze/precheck/env_check/communication_check.py b/profiler/msprof_analyze/precheck/env_check/communication_check.py index 3a8504de6d..fd7195ec84 100644 --- a/profiler/msprof_analyze/precheck/env_check/communication_check.py +++ b/profiler/msprof_analyze/precheck/env_check/communication_check.py @@ -162,6 +162,11 @@ class CommunicationCheck(HardwareCheck): group_info_dict["cp"] = [self.dcb.get_context_parallel_group_ranks(), self.dcb.get_context_parallel_group()] return group_names_list, group_info_dict + + def get_group_name(self): + if len(self.group_names_list) ==0: + return self.group_names_list + def p2p_comm(self, iteration: int = 5, batch_size=(32, 4096, 8192)): """Perform point-to-point communication between ranks in the group""" @@ -217,7 +222,7 @@ class CommunicationCheck(HardwareCheck): except Exception as e: logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") - def collect_data(self): + def collect(self): """ 将全部节点搜集的日志汇聚到master节点,共享存储不需要该操作 """ @@ -230,6 +235,29 @@ class CommunicationCheck(HardwareCheck): logging.info(f"master node {self.rank} is collecting data from other nodes") self.dcb.collect_global_info(send_file_dir, receive_file_dir, time_out=1800, log_file="./comm_test.log") + def analyze(self, save_dir): + """ + 遍历指定路径下的所有JSON文件,并获取指定键对应的value + :param path: 指定的路径 + :param target_key: 要查找的键 + :return: 包含文件路径和对应值的字典 + """ + result = {} + # 生成所有JSON文件的路径 + json_files = (os.path.join(root, file) for root, _, files in os.walk(save_dir) for file in files if file.endswith('.json')) + for file_path in json_files: + try: + # 打开并加载JSON文件 + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + for key in data: + result.setdefault(key, []).append(data[key]) + except json.JSONDecodeError: + print(f"Error decoding JSON in {file_path}") + except Exception as e: + print(f"An error occurred while processing {file_path}: {e}") + return result + if __name__ == "__main__": parser = argparse.ArgumentParser() @@ -241,14 +269,6 @@ if __name__ == "__main__": parser.add_argument("--expert_model_parallel_size", type=int, default=1, required=True) args = parser.parse_args() -<<<<<<< HEAD comm_check = CommunicationCheck(args=args) comm_check.p2p_comm() - comm_check.comm_group_destroy() -======= - def collect(self): - pass - - def analyze(self): - pass ->>>>>>> cd8a3cee398c52a3c0931a592fe47920c74ed884 + comm_check.comm_group_destroy() \ No newline at end of file diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 384e48f723..133274b619 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -176,11 +176,19 @@ class IOCheck(HardwareCheck): except Exception as e: logging.error(f"Error saving results to {self.output}: {e}") - - + + def collect(self): + pass + + def analyze(self, save_dir, target_keys: list): + pass + + + + + -<<<<<<< HEAD if __name__ == "__main__": io = IOCheck() io.generate_file_like_log() @@ -188,11 +196,4 @@ if __name__ == "__main__": io.generate_file_like_ckpt() io.read_file_like_ckpt() io.clean_cache_file('txt') - io.clean_cache_file('npy') -======= - def collect(self): - pass - - def analyze(self): - pass ->>>>>>> cd8a3cee398c52a3c0931a592fe47920c74ed884 + io.clean_cache_file('npy') \ No newline at end of file -- Gitee From 11e7c81b169edbd0352f59e112e435aec036e04f Mon Sep 17 00:00:00 2001 From: zbl Date: Wed, 12 Mar 2025 20:18:29 +0800 Subject: [PATCH 3/6] add analyze --- .../precheck/env_check/communication_check.py | 37 +++---- .../precheck/env_check/io_check.py | 104 ++++++++++++------ 2 files changed, 86 insertions(+), 55 deletions(-) diff --git a/profiler/msprof_analyze/precheck/env_check/communication_check.py b/profiler/msprof_analyze/precheck/env_check/communication_check.py index fd7195ec84..09cbc4fe91 100644 --- a/profiler/msprof_analyze/precheck/env_check/communication_check.py +++ b/profiler/msprof_analyze/precheck/env_check/communication_check.py @@ -22,7 +22,7 @@ from analyze import Timer import torch import torch_npu from torch import distributed as dist -from distributed_cluster_base import DistributedClusterBase +from msprof_analyze.precheck.distributed_cluster.distributed_cluster_base import DistributedClusterBase from msprof_analyze.precheck.env_check.environment_check import HardwareCheck @@ -32,8 +32,7 @@ class CommunicationCheck(HardwareCheck): def __init__(self, args): super().__init__(args) self.world_size = args.world_size - self.output = args.output if args is not None else "./env_check.json" - self.tmp_path = '/tmp/p2p_test' + self.output = args.output if args is not None else "./" self.tp = args.tensor_model_parallel_size self.pp = args.pipeline_model_parallel_size self.cp = args.context_model_parallel_size @@ -42,6 +41,7 @@ class CommunicationCheck(HardwareCheck): self.initialize() self.rank = dist.get_rank() self.no_share_storage = False or args.no_shared_storage + self.tmp_path = '/tmp/p2p_test' def validate_parallel_strategy(self): if self.tp * self.pp * self.cp * self.ep == 0: @@ -196,10 +196,14 @@ class CommunicationCheck(HardwareCheck): timer.stop() local_cost[group_name].append(timer.delta) local_cost[f"{group_name}_avg_cost"] = sum(local_cost[group_name]) / iteration - self.dump_file(local_cost, self.output) if self.no_share_storage: - self.collect_data() - + self.collect(local_cost, self.tmp_path) + else: + tmp_dir = os.path.join(self.output, 'tmp') + if not os.path.exists(tmp_dir): + os.makedirs(tmp_dir) + self.collect(local_cost, tmp_dir) + def get_file_path(self, file_path, file_name): """get the file path of the given file name""" if not os.path.exists(file_path): @@ -207,7 +211,7 @@ class CommunicationCheck(HardwareCheck): os.makedirs(file_path) return os.path.join(file_path, file_name) - def dump_file(self, data, save_dir): + def collect(self, data, save_dir): """save data to file as worker_x_rank_y.json""" cur_rank = dist.get_rank() worker_id = os.getenv("GROUP_RANK") @@ -215,25 +219,13 @@ class CommunicationCheck(HardwareCheck): raise ValueError("GROUP_ID environment variable is not set.") if data is None: raise ValueError(f"data is not created by rank {cur_rank}, please check this") - dump_path = self.get_file_path(self.tmp_path, f"worker_{worker_id}_rank_{cur_rank}.json") + dump_path = self.get_file_path(save_dir, f"worker_{worker_id}_rank_{cur_rank}.json") try: with open(dump_path, 'w') as f: json.dump(data, f, indent=4) except Exception as e: logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") - def collect(self): - """ - 将全部节点搜集的日志汇聚到master节点,共享存储不需要该操作 - """ - worker_id = os.getenv("GROUP_RANK") - if worker_id is None: - raise ValueError("GROUP_ID environment variable is not set.") - send_file_dir = self.get_file_path(self.tmp_path, f'worker_{worker_id}_rank_{self.rank}.json') - receive_file_dir = os.path.join(self.output, 'collected_data') - if self.rank == 0: - logging.info(f"master node {self.rank} is collecting data from other nodes") - self.dcb.collect_global_info(send_file_dir, receive_file_dir, time_out=1800, log_file="./comm_test.log") def analyze(self, save_dir): """ @@ -243,6 +235,8 @@ class CommunicationCheck(HardwareCheck): :return: 包含文件路径和对应值的字典 """ result = {} + if self.rank != 0: + return # 生成所有JSON文件的路径 json_files = (os.path.join(root, file) for root, _, files in os.walk(save_dir) for file in files if file.endswith('.json')) for file_path in json_files: @@ -256,7 +250,8 @@ class CommunicationCheck(HardwareCheck): print(f"Error decoding JSON in {file_path}") except Exception as e: print(f"An error occurred while processing {file_path}: {e}") - return result + with open(self.output + "/communication_check.json", "w") as f: + json.dump(result, f, indent=4) if __name__ == "__main__": diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 133274b619..08fe05d5fc 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -27,26 +27,22 @@ class IOCheck(HardwareCheck): def __init__(self, args): super().__init__(args) - self.base_dir = './data_file' self.global_worker_id = os.getenv("GROUP_RANK") self.local_rank = os.getenv("LOCAL_RANK") - self.ensure_base_dir_exists() + self.ensure_tmp_work_dir_exists() self.output = args.output if args is not None else './results.json' - self.results = {'local_rank': self.local_rank, - 'global_rank': self.global_worker_id, - 'read_cost': { - 'ckpt_read': 0, - 'log_read': 0, - }, - 'write_cost': { - 'ckpt_write': 0, - 'log_write': 0, + self.tmp_work_dir = './data_file' if args and args.no_shared_storage else '/tmp/data_file' + self.results = {self.global_worker_id: { + 'ckpt_read': [], + 'log_read': [], + 'ckpt_write': [], + 'log_write': [], }} - def ensure_base_dir_exists(self): + def ensure_tmp_work_dir_exists(self): """确保数据目录存在""" - if not os.path.exists(self.base_dir): - os.makedirs(self.base_dir) + if not os.path.exists(self.tmp_work_dir): + os.makedirs(self.tmp_work_dir) def generate_log_message(self, index): """生成日志消息""" @@ -57,9 +53,11 @@ class IOCheck(HardwareCheck): def get_file_path(self, index, file_type): """生成文件路径""" if file_type == "txt": - return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.txt') + return os.path.join(self.tmp_work_dir, + f'worker_{self.global_worker_id}_data_{index}.txt') elif file_type == "npy": - return os.path.join(self.base_dir, f'worker_{self.global_worker_id}_data_{index}.npy') + return os.path.join(self.tmp_work_dir, + f'worker_{self.global_worker_id}_data_{index}.npy') else: raise ValueError(f"file type {file_type} is not included in the list [txt, npy]") @@ -91,7 +89,7 @@ class IOCheck(HardwareCheck): total_data_size += len(log_message) end_time = time.time() write_speed = self.calculate_speed(start_time, end_time, total_data_size) - self.results['write_cost']['log_write'] = write_speed + self.results[self.global_worker_id]['log_write'].append(write_speed) return file_path def read_file_like_log(self, file_path=None): @@ -107,7 +105,7 @@ class IOCheck(HardwareCheck): end_time = time.time() data_size = len(data.encode('utf-8')) read_speed = self.calculate_speed(start_time, end_time, data_size) - self.results['read_cost']['log_read'] = read_speed + self.results[self.global_worker_id]['log_read'].append(read_speed) return data except FileNotFoundError: logging.error(f"File {file_path} not found.") @@ -127,7 +125,7 @@ class IOCheck(HardwareCheck): np.save(file_path, weight) end_time = time.time() write_speed = self.calculate_speed(start_time, end_time, total_data_size) - self.results['write_cost']['ckpt_write'] = write_speed + self.results[self.global_worker_id]['ckpt_write'].append(write_speed) return file_path def read_file_like_ckpt(self, file_path=None): @@ -140,7 +138,7 @@ class IOCheck(HardwareCheck): end_time = time.time() data_size = data.nbytes read_speed = self.calculate_speed(start_time, end_time, data_size) - self.results['read_cost']['ckpt_read'] = read_speed + self.results[self.global_worker_id]['ckpt_read'].append(read_speed) return data except FileNotFoundError: logging.error(f"File {file_path} not found.") @@ -157,7 +155,8 @@ class IOCheck(HardwareCheck): elif file_type == 'npy': file_path = self.get_file_path(self.local_rank, file_type='npy') else: - raise ValueError(f"no such file type {file_type} could be clean as cache file loaded temperaly") + raise ValueError(f"no such file type {file_type} could be clean" + f"since cache file loaded temperaly") try: os.remove(file_path) except FileNotFoundError: @@ -167,7 +166,7 @@ class IOCheck(HardwareCheck): except Exception as e: logging.error(f"Unexpected error removing {file_path}: {e}") - def dump_results(self): + def collect(self): """生成临时结果用于分析""" try: with open(self.output, 'w') as f: @@ -175,18 +174,53 @@ class IOCheck(HardwareCheck): logging.info(f"Results have been saved to {self.output}") except Exception as e: logging.error(f"Error saving results to {self.output}: {e}") - - - def collect(self): - pass - - def analyze(self, save_dir, target_keys: list): - pass - - - - + def process_dict(self, data): + """处理字典数据""" + if data is None: + return + for key, value in data.items(): + if isinstance(value, dict): + self.process_dict(value) + else: + data[key] = sum(value) / len(value) + return data + + + def analyze_and_save(self, file_dir): + """ + 遍历指定路径下的所有JSON文件,并获取指定键对应的value,并将结果保存到指定的文件中 + """ + if int(os.getenv("RANK")) != 0: + return + data_to_merge = self.results + + files_dir = (os.path.join(root, file) for root, _, files in + os.walk(file_dir) for file in files if file.endswith('.json')) + for file_path in files_dir: + try: + with open(file_path, 'r') as file: + file_data = json.load(file) + def merge_dicts(dict1, dict2): + for key, value in dict2.items(): + if key in dict1: + if isinstance(dict1[key], dict) and isinstance(value, dict): + merge_dicts(dict1[key], value) + elif isinstance(dict1[key], list) and isinstance(value, list): + dict1[key].extend(value) + else: + dict1[key] = value + else: + dict1[key] = value + return dict1 + data_to_merge = merge_dicts(data_to_merge, file_data) + except FileNotFoundError: + print(f"文件 {file_path} 未找到。") + except json.JSONDecodeError: + print(f"文件 {file_path} 不是有效的 JSON 文件。") + data_to_merge = self.process_dict(data_to_merge) + with open('./final_result.json', 'w') as f: + json.dump(data_to_merge, f, indent=4) if __name__ == "__main__": @@ -196,4 +230,6 @@ if __name__ == "__main__": io.generate_file_like_ckpt() io.read_file_like_ckpt() io.clean_cache_file('txt') - io.clean_cache_file('npy') \ No newline at end of file + io.clean_cache_file('npy') + io.collect() + io.analyze('./tmp_result/') \ No newline at end of file -- Gitee From df5b44b18bbb455fbbf9888036d740bcc05a2dee Mon Sep 17 00:00:00 2001 From: zbl Date: Wed, 12 Mar 2025 20:28:26 +0800 Subject: [PATCH 4/6] add analyze --- .../precheck/env_check/communication_check.py | 26 ++++++++++++++++++- .../precheck/env_check/io_check.py | 5 ++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/profiler/msprof_analyze/precheck/env_check/communication_check.py b/profiler/msprof_analyze/precheck/env_check/communication_check.py index 09cbc4fe91..dadcfb2134 100644 --- a/profiler/msprof_analyze/precheck/env_check/communication_check.py +++ b/profiler/msprof_analyze/precheck/env_check/communication_check.py @@ -227,7 +227,8 @@ class CommunicationCheck(HardwareCheck): logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") - def analyze(self, save_dir): + def collect(self): + """ 遍历指定路径下的所有JSON文件,并获取指定键对应的value :param path: 指定的路径 @@ -253,6 +254,29 @@ class CommunicationCheck(HardwareCheck): with open(self.output + "/communication_check.json", "w") as f: json.dump(result, f, indent=4) + def analyze(self, save_dir): + """ + 遍历指定路径下的所有JSON文件,并获取指定键对应的value + :param path: 指定的路径 + :param target_key: 要查找的键 + :return: 包含文件路径和对应值的字典 + """ + result = {} + # 生成所有JSON文件的路径 + json_files = (os.path.join(root, file) for root, _, files in os.walk(save_dir) for file in files if file.endswith('.json')) + for file_path in json_files: + try: + # 打开并加载JSON文件 + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + for key in data: + result.setdefault(key, []).append(data[key]) + except json.JSONDecodeError: + print(f"Error decoding JSON in {file_path}") + except Exception as e: + print(f"An error occurred while processing {file_path}: {e}") + return result + if __name__ == "__main__": parser = argparse.ArgumentParser() diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 08fe05d5fc..22087abf6d 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -186,7 +186,6 @@ class IOCheck(HardwareCheck): data[key] = sum(value) / len(value) return data - def analyze_and_save(self, file_dir): """ 遍历指定路径下的所有JSON文件,并获取指定键对应的value,并将结果保存到指定的文件中 @@ -232,4 +231,6 @@ if __name__ == "__main__": io.clean_cache_file('txt') io.clean_cache_file('npy') io.collect() - io.analyze('./tmp_result/') \ No newline at end of file + io.analyze('./tmp_result/') + io.clean_cache_file('npy') + -- Gitee From 510ad7730c8974184f9465975c7c8bb243f9a9fd Mon Sep 17 00:00:00 2001 From: zbl Date: Wed, 12 Mar 2025 20:37:17 +0800 Subject: [PATCH 5/6] add analyze --- profiler/msprof_analyze/precheck/env_check/io_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 22087abf6d..7093da8bf2 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -186,7 +186,7 @@ class IOCheck(HardwareCheck): data[key] = sum(value) / len(value) return data - def analyze_and_save(self, file_dir): + def analyze(self, file_dir): """ 遍历指定路径下的所有JSON文件,并获取指定键对应的value,并将结果保存到指定的文件中 """ -- Gitee From 79475aa1e467fb4b12a1487e6e9d29691db6d25c Mon Sep 17 00:00:00 2001 From: zbl Date: Wed, 12 Mar 2025 20:49:30 +0800 Subject: [PATCH 6/6] add analyze --- .../precheck/env_check/communication_check.py | 39 +++---------------- .../precheck/env_check/io_check.py | 4 +- 2 files changed, 7 insertions(+), 36 deletions(-) diff --git a/profiler/msprof_analyze/precheck/env_check/communication_check.py b/profiler/msprof_analyze/precheck/env_check/communication_check.py index dadcfb2134..fab4e4b2f8 100644 --- a/profiler/msprof_analyze/precheck/env_check/communication_check.py +++ b/profiler/msprof_analyze/precheck/env_check/communication_check.py @@ -167,7 +167,6 @@ class CommunicationCheck(HardwareCheck): if len(self.group_names_list) ==0: return self.group_names_list - def p2p_comm(self, iteration: int = 5, batch_size=(32, 4096, 8192)): """Perform point-to-point communication between ranks in the group""" group_names_list, group_info_dict = self.comm_group_create() @@ -226,17 +225,12 @@ class CommunicationCheck(HardwareCheck): except Exception as e: logging.error(f"An error occurred while rank {cur_rank} saving data to {dump_path}: {e}") - - def collect(self): - + def analyze(self, save_dir): """ 遍历指定路径下的所有JSON文件,并获取指定键对应的value - :param path: 指定的路径 - :param target_key: 要查找的键 - :return: 包含文件路径和对应值的字典 """ result = {} - if self.rank != 0: + if int(os.getenv('GROUP_RANK')) != 0: return # 生成所有JSON文件的路径 json_files = (os.path.join(root, file) for root, _, files in os.walk(save_dir) for file in files if file.endswith('.json')) @@ -248,35 +242,12 @@ class CommunicationCheck(HardwareCheck): for key in data: result.setdefault(key, []).append(data[key]) except json.JSONDecodeError: - print(f"Error decoding JSON in {file_path}") + logging.error(f"Error decoding JSON in {file_path}") except Exception as e: - print(f"An error occurred while processing {file_path}: {e}") - with open(self.output + "/communication_check.json", "w") as f: + logging.error(f"An error occurred while processing {file_path}: {e}") + with open(self.output + '/communication_check.json', 'w', encoding='utf-8') as f: json.dump(result, f, indent=4) - def analyze(self, save_dir): - """ - 遍历指定路径下的所有JSON文件,并获取指定键对应的value - :param path: 指定的路径 - :param target_key: 要查找的键 - :return: 包含文件路径和对应值的字典 - """ - result = {} - # 生成所有JSON文件的路径 - json_files = (os.path.join(root, file) for root, _, files in os.walk(save_dir) for file in files if file.endswith('.json')) - for file_path in json_files: - try: - # 打开并加载JSON文件 - with open(file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - for key in data: - result.setdefault(key, []).append(data[key]) - except json.JSONDecodeError: - print(f"Error decoding JSON in {file_path}") - except Exception as e: - print(f"An error occurred while processing {file_path}: {e}") - return result - if __name__ == "__main__": parser = argparse.ArgumentParser() diff --git a/profiler/msprof_analyze/precheck/env_check/io_check.py b/profiler/msprof_analyze/precheck/env_check/io_check.py index 7093da8bf2..bade5fb94e 100644 --- a/profiler/msprof_analyze/precheck/env_check/io_check.py +++ b/profiler/msprof_analyze/precheck/env_check/io_check.py @@ -214,9 +214,9 @@ class IOCheck(HardwareCheck): return dict1 data_to_merge = merge_dicts(data_to_merge, file_data) except FileNotFoundError: - print(f"文件 {file_path} 未找到。") + logging.error(f"文件 {file_path} 未找到。") except json.JSONDecodeError: - print(f"文件 {file_path} 不是有效的 JSON 文件。") + logging.error(f"文件 {file_path} 不是有效的 JSON 文件。") data_to_merge = self.process_dict(data_to_merge) with open('./final_result.json', 'w') as f: json.dump(data_to_merge, f, indent=4) -- Gitee