From 87f1a236e041f8682d60c69ecd65bf76a8b34cfb Mon Sep 17 00:00:00 2001 From: huangbin Date: Thu, 12 Jun 2025 15:59:01 +0800 Subject: [PATCH] convert json to csv; match node ip for output. --- .../failslow/dataloader/marker_data_reader.py | 35 ++-- systrace/src/mspti/json_file_writer.h | 190 ++++++++---------- systrace/src/mspti/mspti_tracker.cpp | 2 +- 3 files changed, 103 insertions(+), 124 deletions(-) diff --git a/failslow/failslow/dataloader/marker_data_reader.py b/failslow/failslow/dataloader/marker_data_reader.py index 74c7ccc..a80c5f9 100644 --- a/failslow/failslow/dataloader/marker_data_reader.py +++ b/failslow/failslow/dataloader/marker_data_reader.py @@ -54,8 +54,16 @@ class MarkerDataloader: return data_df + def get_node_ip_by_rank(self, rank): + for node_ip, ranks in self.node_id2ranks_dict.items(): + if rank in ranks: + return node_ip + + return None + def read_local_device_df_by_rank(self, rank: int): - file = f"hccl_activity.{rank}.csv" + node_ip = self.get_node_ip_by_rank(rank) + file = f"hccl_activity-{node_ip}-.{rank}.csv" if file in self.csv_files: local_device_path = self.local_d_files.get(file, None) if local_device_path: @@ -73,7 +81,8 @@ class MarkerDataloader: return comm_results def read_local_op_launch_df_by_rank(self, rank: int): - file = f"hccl_activity.{rank}.csv" + node_ip = self.get_node_ip_by_rank(rank) + file = f"hccl_activity-{node_ip}-.{rank}.csv" if file in self.csv_files: local_device_path = self.local_op_launch_files.get(file, None) if local_device_path: @@ -178,7 +187,8 @@ class MarkerDataloader: device_df = self.extract_device_df(data_df) op_launch_df = self.extract_op_launch_df(data_df) - device_ids = int(csv_file.split(".")[1]) + device_ids = int(csv_file.split(".")[-2]) + self.get_node_ids_from_filepath(csv_file, device_ids) if not len(device_df): self.empty_data_ranks.append(device_ids) # 分列以及生成start,end timestamp @@ -187,8 +197,7 @@ class MarkerDataloader: self.save_device_df(device_df, csv_file) self.save_op_launch_df(op_launch_df, csv_file) if len(device_df): - comm_groups_ids = device_df[TableItem.ex_comm_group].unique() - self.get_node_ids_from_comm_groups(comm_groups_ids, device_ids) + comm_groups_ids = device_df[TableItem.ex_comm_group].unique() else: comm_groups_ids = [] selected_indices, comm_ops = self.get_ops_by_comm_name(comm_groups_ids, device_df) @@ -198,19 +207,17 @@ class MarkerDataloader: comm_groups = self.create_comm_groups(comm_groups_ids, selected_indices, comm_ops, device_ids, count_ops) self.extend_group_ranks(all_comm_groups, comm_groups) - self.fix_node_ids2ranks() logger.info(f"node id and ranks: {self.node_id2ranks_dict}") all_comm_groups = self.get_fp_comm_groups(all_comm_groups) return all_comm_groups - def get_node_ids_from_comm_groups(self, comm_groups_ids: list, rank: int): - for comm_group_id in comm_groups_ids: - node_id = comm_group_id.split("%")[0] - if node_id not in self.node_id2ranks_dict.keys(): - self.node_id2ranks_dict[node_id] = [rank] - else: - self.node_id2ranks_dict[node_id].append(rank) - break + def get_node_ids_from_filepath(self, csv_file: str, rank: int): + ''' csv_file: hccl_activity-9.13.100.7-.0.csv ''' + node_id = csv_file.split("-")[1] + if node_id not in self.node_id2ranks_dict.keys(): + self.node_id2ranks_dict[node_id] = [rank] + else: + self.node_id2ranks_dict[node_id].append(rank) def fix_node_ids2ranks(self): for rank_empty in self.empty_data_ranks: diff --git a/systrace/src/mspti/json_file_writer.h b/systrace/src/mspti/json_file_writer.h index 9b19788..de298c3 100644 --- a/systrace/src/mspti/json_file_writer.h +++ b/systrace/src/mspti/json_file_writer.h @@ -12,27 +12,24 @@ #include #include -class MSPTIHcclFileWriter -{ - private: +class MSPTIHcclFileWriter { +private: std::ofstream file; std::mutex buffermtx; std::mutex bufferMarkerMtx; std::mutex threadmtx; - std::atomic opened; + std::atomic opened; std::unique_ptr> markerActivityBuffer; std::thread writerThread; std::condition_variable cv; std::atomic stop; - Json::Value root = Json::Value(Json::ValueType::arrayValue); - public: - MSPTIHcclFileWriter(const std::string &filename) - { +public: + MSPTIHcclFileWriter(const std::string& filename) { // obtain environment variable LOCAL_RANK // to determine the rank of the process // and append it to the filename - const char *path = std::getenv("METRIC_PATH"); + const char* path = std::getenv("METRIC_PATH"); std::string savePath = path ? path : SYS_TRACE_ROOT_DIR "mspti/"; if (systrace::util::fs_utils::CreateDirectoryIfNotExists(savePath)) { @@ -40,150 +37,125 @@ class MSPTIHcclFileWriter return; } std::string savePathStr = savePath; - if (!savePathStr.empty() && savePathStr.back() != '/') - { + if (!savePathStr.empty() && savePathStr.back() != '/') { savePathStr += "/"; } std::string saveFilename = savePathStr + filename; std::string filenameWithRank = saveFilename; - this->markerActivityBuffer = - std::make_unique>(); + this->markerActivityBuffer = std::make_unique>(); - const char *localRankCStr = std::getenv("RANK"); - if (localRankCStr == nullptr) - { + const char* localRankCStr = std::getenv("RANK"); + if (localRankCStr == nullptr) { localRankCStr = "-1"; } - std::string localRank = - localRankCStr; // Now safe to construct std::string - auto rank = std::stoi(localRank); - if (saveFilename.length() >= 5 && - saveFilename.substr(saveFilename.length() - 5) == ".json") - { - std::string baseName = - saveFilename.substr(0, saveFilename.length() - 5); - filenameWithRank = baseName + "." + std::to_string(rank) + ".json"; - } - else - { + std::string localRank = localRankCStr; + auto rank = std::stoi(localRank); + if (saveFilename.length() >= 4 && saveFilename.substr(saveFilename.length() - 4) == ".csv") { + std::string baseName = saveFilename.substr(0, saveFilename.length() - 4); + filenameWithRank = baseName + "." + std::to_string(rank) + ".csv"; + } else { filenameWithRank = saveFilename + "." + std::to_string(rank); } std::cout << "Filename: " << filenameWithRank << std::endl; - this->file.open(filenameWithRank, std::ios::out | std::ios::app); - this->opened.store(true); + + // if file does not exists + // create it and write header + if (this->fileExists(filenameWithRank)) { + this->file.open(filenameWithRank, std::ios::out | std::ios::app); + this->opened.store(true); + } else { + this->file.open(filenameWithRank, std::ios::out | std::ios::app); + this->opened.store(true); + this->file << "Flag,Id,Kind,Name,SourceKind,Timestamp,msptiObjectId_Ds_DeviceId,msptiObjectId_Ds_StreamId,msptiObjectId_Pt_ProcessId,msptiObjectId_Pt_ThreadId" << std::endl; + } + this->stop.store(false); this->run(); } - void stopWriter() - { - if (this->file.is_open()) - { + void stopWriter() { + if (this->file.is_open()) { { std::unique_lock lock(this->threadmtx); + // clean up the thread this->stop.store(true); } this->cv.notify_all(); - this->hcclActivityFormatToJson(); - if (this->writerThread.joinable()) - { + this->hcclActivityFormatToCSV(); + if (this->writerThread.joinable()){ this->writerThread.join(); } + // write the remaining buffer + std::cout << "Closing file" << std::endl; this->file.close(); this->opened.store(false); } } - ~MSPTIHcclFileWriter() { this->stopWriter(); } + ~MSPTIHcclFileWriter() { + this->stopWriter(); + } - bool fileExists(const std::string &fp) - { + bool fileExists(const std::string& fp) { std::ifstream file(fp.c_str()); return file.good() && file.is_open(); } - void bufferMarkerActivity(msptiActivityMarker *activity) - { + + void bufferMarkerActivity(msptiActivityMarker* activity) { std::lock_guard lock(this->bufferMarkerMtx); this->markerActivityBuffer->push_back(*activity); } - void run() - { + void run() { // a thread to periodically flush // the buffer to the file // watch the conditional variable for signal - this->writerThread = std::thread( - [this]() - { - while (!this->stop.load()) - { - std::unique_lock lock(this->threadmtx); - if (this->cv.wait_for(lock, std::chrono::seconds(5)) == - std::cv_status::timeout) - { - this->hcclActivityFormatToJson(); - } - else if (this->stop.load()) - { - break; - }; - } - }); + this->writerThread = std::thread([this](){ + while (!this->stop.load()) { + std::unique_lock lock(this->threadmtx); + if (this->cv.wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout){ + this->hcclActivityFormatToCSV(); + } else if (this->stop.load()) { + break; + }; + } + }); + } - void hcclActivityFormatToJson() - { + void replaceCommasWithExclamation(const char* input, char* output) { + for (int i = 0; input[i] != '\0'; i++) { + if (input[i] == ',') { + output[i] = '!'; + } else { + output[i] = input[i]; + } + } + output[strlen(input)] = '\0'; + } + + void hcclActivityFormatToCSV() { std::lock_guard lock(this->buffermtx); - if (this->file.is_open()) - { - for (auto activity : *this->markerActivityBuffer) - { - Json::Value markerJson; - markerJson["Kind"] = activity.kind; - markerJson["SourceKind"] = activity.sourceKind; - markerJson["Timestamp"] = activity.timestamp; - markerJson["Id"] = activity.id; - markerJson["Flag"] = activity.flag; - Json::Value msptiObjecId; - if (activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) - { - Json::Value pt; - pt["ProcessId"] = activity.objectId.pt.processId; - pt["ThreadId"] = activity.objectId.pt.threadId; - Json::Value ds; - ds["DeviceId"] = activity.objectId.pt.processId; - ds["StreamId"] = activity.objectId.pt.threadId; - msptiObjecId["Pt"] = pt; - msptiObjecId["Ds"] = ds; - } - else if (activity.sourceKind == - MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) - { - Json::Value ds; - ds["DeviceId"] = activity.objectId.ds.deviceId; - ds["StreamId"] = activity.objectId.ds.streamId; - Json::Value pt; - pt["ProcessId"] = activity.objectId.ds.deviceId; - pt["ThreadId"] = activity.objectId.ds.streamId; - msptiObjecId["Pt"] = pt; - msptiObjecId["Ds"] = ds; + if (this->file.is_open()) { + // enumerate the buffer and write to file + for (auto activity : *this->markerActivityBuffer) { + char result[strlen(activity.name) + 1]; + this->replaceCommasWithExclamation(activity.name, result); + // "Flag,Id,Kind,Name,SourceKind,Timestamp,msptiObjectId_Ds_DeviceId,msptiObjectId_Ds_StreamId,msptiObjectId_Pt_ProcessId,msptiObjectId_Pt_ThreadId"; + if (activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) { + this->file << activity.flag << "," << activity.id << "," << activity.kind << "," << result << "," << \ + activity.sourceKind<< "," << activity.timestamp << "," << activity.objectId.pt.processId << "," << activity.objectId.pt.threadId << "," << \ + activity.objectId.pt.processId << "," << activity.objectId.pt.threadId << std::endl; + }else if(activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) { + this->file << activity.flag << "," << activity.id << "," << activity.kind << "," << result << "," << \ + activity.sourceKind<< "," << activity.timestamp << "," << activity.objectId.ds.deviceId << "," << activity.objectId.ds.streamId << "," << \ + activity.objectId.ds.deviceId << "," << activity.objectId.ds.streamId << std::endl; } - markerJson["msptiObjectId"] = msptiObjecId; - markerJson["Name"] = activity.name; - this->root.append(markerJson); - } - if (this->root.size() > 0) - { - Json::StyledWriter writer; - this->file << writer.write(this->root); - this->root.clear(); } this->markerActivityBuffer->clear(); - } - else - { + } else { std::cout << "File is not open" << std::endl; } } -}; \ No newline at end of file +}; diff --git a/systrace/src/mspti/mspti_tracker.cpp b/systrace/src/mspti/mspti_tracker.cpp index aa87891..4f85ecd 100644 --- a/systrace/src/mspti/mspti_tracker.cpp +++ b/systrace/src/mspti/mspti_tracker.cpp @@ -20,7 +20,7 @@ inline uint8_t *align_buffer(uint8_t *buffer, size_t align) MSPTITracker::MSPTITracker() { std::cout << "Logging initialized from preloaded library." << std::endl; - std::string file_name = "hccl_activity-" + systrace::util::GetPrimaryIP() + "-.json"; + std::string file_name = "hccl_activity-" + systrace::util::GetPrimaryIP() + "-.csv"; hcclFileWriter = std::make_unique(file_name); msptiSubscribe(&subscriber, nullptr, nullptr); -- Gitee