diff --git a/0001-fix-lack-of-data-by-replacing-json-to-csv-and-detect.patch b/0001-fix-lack-of-data-by-replacing-json-to-csv-and-detect.patch new file mode 100644 index 0000000000000000000000000000000000000000..284c3ff7eb6016299749aa01f1bf313659670ee5 --- /dev/null +++ b/0001-fix-lack-of-data-by-replacing-json-to-csv-and-detect.patch @@ -0,0 +1,540 @@ +From af11818ad9b2c4d9cbc91c3a98ca6165bc88bb0c Mon Sep 17 00:00:00 2001 +From: root +Date: Wed, 4 Jun 2025 16:08:25 +0800 +Subject: [PATCH] fix lack of data by replacing json to csv and detecting align + data + +--- + .../failslow/dataloader/marker_data_reader.py | 54 ++++- + .../failslow/process/post_process.py | 18 +- + .../failslow/response/response.py | 3 +- + .../failslow/slow_node_detection.py | 19 +- + sysTrace/src/mspti/json_file_writer.h | 195 ++++++++---------- + sysTrace/src/mspti/mspti_tracker.cpp | 6 +- + 6 files changed, 161 insertions(+), 134 deletions(-) + +diff --git a/sysTrace-failslow/failslow/dataloader/marker_data_reader.py b/sysTrace-failslow/failslow/dataloader/marker_data_reader.py +index d4842fc..4f5c140 100644 +--- a/sysTrace-failslow/failslow/dataloader/marker_data_reader.py ++++ b/sysTrace-failslow/failslow/dataloader/marker_data_reader.py +@@ -41,6 +41,8 @@ class MarkerDataloader: + self.id2name_maps = dict() + self.local_d_files = dict() + self.local_op_launch_files = dict() ++ self.empty_data_ranks = [] ++ self.node_id2ranks_dict = {} + + @staticmethod + def read_csv(file_path): +@@ -174,14 +176,21 @@ class MarkerDataloader: + + device_df = self.extract_device_df(data_df) + op_launch_df = self.extract_op_launch_df(data_df) +- device_ids = int(device_df[TableItem.device_id].unique()[0]) +- ++ if len(device_df): ++ device_ids = int(device_df[TableItem.device_id].unique()[0]) ++ else: ++ device_ids = int(csv_file.split(".")[1]) ++ self.empty_data_ranks.append(device_ids) + # 分列以及生成start,end timestamp + device_df = self.process_df(device_df, csv_file) + op_launch_df = self.process_df(op_launch_df, csv_file) + self.save_device_df(device_df, csv_file) + self.save_op_launch_df(op_launch_df, csv_file) +- comm_groups_ids = device_df[TableItem.ex_comm_group].unique() ++ if len(device_df): ++ comm_groups_ids = device_df[TableItem.ex_comm_group].unique() ++ self.get_node_ids_from_comm_groups(comm_groups_ids, device_ids) ++ else: ++ comm_groups_ids = [] + selected_indices, comm_ops = self.get_ops_by_comm_name(comm_groups_ids, device_df) + count_ops = self.get_count_ops(comm_groups_ids, device_df) + +@@ -189,9 +198,34 @@ class MarkerDataloader: + comm_groups = self.create_comm_groups(comm_groups_ids, selected_indices, comm_ops, device_ids, count_ops) + self.extend_group_ranks(all_comm_groups, comm_groups) + ++ self.fix_node_ids2ranks() ++ logger.info(f"node id and ranks: {self.node_id2ranks_dict}") + all_comm_groups = self.get_fp_comm_groups(all_comm_groups) + return all_comm_groups + ++ def get_node_ids_from_comm_groups(self, comm_groups_ids: list, rank: int): ++ for comm_group_id in comm_groups_ids: ++ node_id = comm_group_id.split("%")[0] ++ if node_id not in self.node_id2ranks_dict.keys(): ++ self.node_id2ranks_dict[node_id] = [rank] ++ else: ++ self.node_id2ranks_dict[node_id].append(rank) ++ break ++ ++ def fix_node_ids2ranks(self): ++ for rank_empty in self.empty_data_ranks: ++ empty_ranks_interg = rank_empty // 8 ++ match_flag = False ++ for node_id, ranks in self.node_id2ranks_dict.items(): ++ if ranks: ++ node_rank_interg = ranks[0] // 8 ++ if node_rank_interg == empty_ranks_interg: ++ self.node_id2ranks_dict[node_id].append(rank_empty) ++ match_flag = True ++ break ++ if match_flag: ++ break ++ + def process_df(self, data_df: pd.DataFrame, csv_file: str, op_ext=None) -> pd.DataFrame: + """ + 对 DataFrame 进行处理,包括分组聚合、列拆分、添加新列等操作 +@@ -210,12 +244,14 @@ class MarkerDataloader: + metric_name = TableItem.ex_comm_op + if op_ext: + metric_name = f"{metric_name}_launch" +- if "!" in df["Name"].iloc[0]: +- df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ +- TableItem.name].str.replace('comm:', '').str.split('!', expand=True) +- else: +- df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ +- TableItem.name].str.replace('comm:', '').str.split(',', expand=True) ++ ++ if len(df): ++ if "!" in df["Name"].iloc[0]: ++ df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ ++ TableItem.name].str.replace('comm:', '').str.split('!', expand=True) ++ else: ++ df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ ++ TableItem.name].str.replace('comm:', '').str.split(',', expand=True) + + return df + +diff --git a/sysTrace-failslow/failslow/process/post_process.py b/sysTrace-failslow/failslow/process/post_process.py +index c662a5e..67c9e69 100644 +--- a/sysTrace-failslow/failslow/process/post_process.py ++++ b/sysTrace-failslow/failslow/process/post_process.py +@@ -21,11 +21,12 @@ logger = get_default_logger(__name__) + + + class PostProcess(): +- def __init__(self, metric_args, model_args): ++ def __init__(self, metric_args, model_args, node_ip2ranks): + self.metric_args = metric_args + self.model_args = model_args + self.max_num_normal_results = self.model_args.get("max_num_normal_results", 10) + self.record_kpi_value = self.model_args.get("record_kpi", False) ++ self.node_ip2ranks = node_ip2ranks + + def gen_final_alarm(self, detect_results: List): + response = AIJobDetectResult() +@@ -73,12 +74,22 @@ class PostProcess(): + break # Assuming only one SPACE type is considered for simplicity. + return keep_devices, omitted_devices + ++ def get_node_id_by_rank(self, rank): ++ node_id = "localhost" ++ for tmp_node_ip, ranks in self.node_ip2ranks.items(): ++ if rank in ranks: ++ node_id = tmp_node_ip ++ break ++ ++ return node_id ++ + def _process_abnormal_device(self, detect_result: Dict, device_label: str, keep_devices: List, + omitted_devices: List, metric_name: str) -> NodeData: + method_type = detect_result["detect_result_type"][device_label].get(metric_name, "TIME") + time_stamp_data, values = detect_result["anomaly_locations"][device_label][metric_name] + label_dict = dict(zip(time_stamp_data.tolist(), values.tolist())) +- abnormal_node_data = NodeData(metric_name, device_label, method_type, keep_devices, omitted_devices) ++ node_ip = self.get_node_id_by_rank(device_label) ++ abnormal_node_data = NodeData(metric_name, device_label, method_type, node_ip, keep_devices, omitted_devices) + + if self.record_kpi_value: + g_ts, g_value = detect_result["group_data"][device_label].values[:, 0], detect_result["group_data"][ +@@ -93,7 +104,8 @@ class PostProcess(): + metric_name: str): + if keep_devices: + for device_label in keep_devices: +- normal_node_data = NodeData(metric_name, device_label, "SPACE") ++ node_ip = self.get_node_id_by_rank(device_label) ++ normal_node_data = NodeData(metric_name, device_label, "SPACE", node_ip) + if self.record_kpi_value: + g_ts, g_value = detect_result["group_data"][device_label].values[:, 0], detect_result["group_data"][ + device_label].values[:, +diff --git a/sysTrace-failslow/failslow/response/response.py b/sysTrace-failslow/failslow/response/response.py +index ae15a3a..3f126b2 100644 +--- a/sysTrace-failslow/failslow/response/response.py ++++ b/sysTrace-failslow/failslow/response/response.py +@@ -97,11 +97,10 @@ class NodeData(dict): + rela_ids = typed_property("relaIds", list, False) + omitted_devices = typed_property("omittedDevices", list, False) + +- def __init__(self, metric_name, device_label, method_type, relate_device_labels=None, omitted_devices=None): ++ def __init__(self, metric_name, device_label, method_type, server_ip="localhost", relate_device_labels=None, omitted_devices=None): + super().__init__() + # device_label: 1 + sys_id = device_label +- server_ip = "localhost" + self.object_id = str(sys_id) + self.server_ip = server_ip + self.device_info = f"rank_{device_label}" +diff --git a/sysTrace-failslow/failslow/slow_node_detection.py b/sysTrace-failslow/failslow/slow_node_detection.py +index b1e6700..d1a9f65 100644 +--- a/sysTrace-failslow/failslow/slow_node_detection.py ++++ b/sysTrace-failslow/failslow/slow_node_detection.py +@@ -47,7 +47,8 @@ class SlowNodeDetector: + self.hccl_domains = self._init_hccl_domains() + + self.aggregate_method = {} +- self.post_process = PostProcess(metric_args, model_args) ++ node_ip2ranks = self.dataloader.node_id2ranks_dict ++ self.post_process = PostProcess(metric_args, model_args, node_ip2ranks) + self.group_anomaly_detector = GroupAnomalyDetector() + self.enable_detect_type = self.model_args.get("enable_detect_type", {}) + self.fail_slow_ops = self.model_args.get("fail_slow_ops", {}) +@@ -272,9 +273,11 @@ class SlowNodeDetector: + logger.info(f"Skip space nodes compare.") + + # 时间空间结果融合 ++ # 未采集数据记录,提醒重新验证测试 ++ empty_ranks = self.dataloader.empty_data_ranks + anomaly_locations, detect_result_type = self.group_anomaly_detector.time_space_agg(time_anomaly_locations, + space_anomaly_locations, +- metric_name) ++ metric_name, empty_ranks) + anomaly_devices = self.output_anomaly_devices(metric_name, anomaly_locations) + detection_results["anomaly_devices"] = anomaly_devices + detection_results["anomaly_locations"] = anomaly_locations +@@ -384,8 +387,8 @@ class SlowNodeDetector: + for comm_group in self.comm_groups: + comm_name = comm_group.comm_name + group_ranks_list = comm_group.group_ranks +- if (not is_same_list(group_ranks_list, target_group)) or ( +- not is_continuous(group_ranks_list)): ++ if (not is_same_list(group_ranks_list, target_group)): ++ logger.info(f"jump detection. Please check group ranks: {group_ranks_list}") + continue + + logger.info(f"Start Calculating Slow Detect in Group {group_ranks_list}.") +@@ -504,9 +507,8 @@ class GroupAnomalyDetector: + pass + + @staticmethod +- def time_space_agg(time_anomaly_locations, space_anomaly_locations, metric_name): ++ def time_space_agg(time_anomaly_locations, space_anomaly_locations, metric_name, empty_ranks): + detect_result_type = {} +- + for node_id in time_anomaly_locations.keys(): + time_ret = np.sum(time_anomaly_locations[node_id][metric_name][1]) + if space_anomaly_locations: +@@ -522,6 +524,11 @@ class GroupAnomalyDetector: + else: + detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "TIME") + ++ for empty_rank in empty_ranks: ++ detect_result_type.setdefault(empty_rank, {}).setdefault(metric_name, "EMPTY") ++ timestamp = time_anomaly_locations[node_id][metric_name][0] ++ time_anomaly_locations[empty_rank] = {metric_name: (timestamp, np.ones(len(timestamp)))} ++ + return time_anomaly_locations, detect_result_type + + def time_node_compare(self, metric_name: str, cfg: Dict, detect_data: Dict): +diff --git a/sysTrace/src/mspti/json_file_writer.h b/sysTrace/src/mspti/json_file_writer.h +index fc46d31..932ca09 100644 +--- a/sysTrace/src/mspti/json_file_writer.h ++++ b/sysTrace/src/mspti/json_file_writer.h +@@ -1,4 +1,6 @@ + #pragma once ++#include "../../include/common/shared_constants.h" ++#include "../../include/common/util.h" + #include "mspti.h" + #include + #include +@@ -10,176 +12,147 @@ + #include + #include + +-class MSPTIHcclFileWriter +-{ +- private: ++class MSPTIHcclFileWriter { ++private: + std::ofstream file; + std::mutex buffermtx; + std::mutex bufferMarkerMtx; + std::mutex threadmtx; +- std::atomic opened; ++ std::atomic opened; + std::unique_ptr> markerActivityBuffer; + std::thread writerThread; + std::condition_variable cv; + std::atomic stop; +- Json::Value root = Json::Value(Json::ValueType::arrayValue); + +- public: +- MSPTIHcclFileWriter(const std::string &filename) +- { ++public: ++ MSPTIHcclFileWriter(const std::string& filename) { + // obtain environment variable LOCAL_RANK + // to determine the rank of the process + // and append it to the filename +- const char *savePath = std::getenv("METRIC_PATH"); +- if (savePath == nullptr) +- { ++ const char* savePath = std::getenv("METRIC_PATH"); ++ if (savePath == nullptr) { + savePath = "/var/log"; + } + std::string savePathStr = savePath; +- if (!savePathStr.empty() && savePathStr.back() != '/') +- { ++ if (!savePathStr.empty() && savePathStr.back() != '/') { + savePathStr += "/"; + } + std::string saveFilename = savePathStr + filename; + std::string filenameWithRank = saveFilename; +- this->markerActivityBuffer = +- std::make_unique>(); ++ this->markerActivityBuffer = std::make_unique>(); + +- const char *localRankCStr = std::getenv("RANK"); +- if (localRankCStr == nullptr) +- { ++ const char* localRankCStr = std::getenv("RANK"); ++ if (localRankCStr == nullptr) { + localRankCStr = "-1"; + } +- std::string localRank = +- localRankCStr; // Now safe to construct std::string +- auto rank = std::stoi(localRank); +- if (saveFilename.length() >= 5 && +- saveFilename.substr(saveFilename.length() - 5) == ".json") +- { +- std::string baseName = +- saveFilename.substr(0, saveFilename.length() - 5); +- filenameWithRank = baseName + "." + std::to_string(rank) + ".json"; +- } +- else +- { ++ std::string localRank = localRankCStr; // Now safe to construct std::string ++ auto rank = std::stoi(localRank); ++ if (saveFilename.length() >= 4 && saveFilename.substr(saveFilename.length() - 4) == ".csv") { ++ std::string baseName = saveFilename.substr(0, saveFilename.length() - 4); ++ filenameWithRank = baseName + "." + std::to_string(rank) + ".csv"; ++ } else { + filenameWithRank = saveFilename + "." + std::to_string(rank); + } + std::cout << "Filename: " << filenameWithRank << std::endl; +- this->file.open(filenameWithRank, std::ios::out | std::ios::app); +- this->opened.store(true); ++ ++ // if file does not exists ++ // create it and write header ++ if (this->fileExists(filenameWithRank)) { ++ this->file.open(filenameWithRank, std::ios::out | std::ios::app); ++ this->opened.store(true); ++ } else { ++ this->file.open(filenameWithRank, std::ios::out | std::ios::app); ++ this->opened.store(true); ++ this->file << "Flag,Id,Kind,Name,SourceKind,Timestamp,msptiObjectId_Ds_DeviceId,msptiObjectId_Ds_StreamId,msptiObjectId_Pt_ProcessId,msptiObjectId_Pt_ThreadId" << std::endl; ++ } ++ + this->stop.store(false); + this->run(); + } + +- void stopWriter() +- { +- if (this->file.is_open()) +- { ++ void stopWriter() { ++ if (this->file.is_open()) { + { + std::unique_lock lock(this->threadmtx); ++ // clean up the thread + this->stop.store(true); + } + this->cv.notify_all(); +- this->hcclActivityFormatToJson(); +- if (this->writerThread.joinable()) +- { ++ this->hcclActivityFormatToCSV(); ++ if (this->writerThread.joinable()){ + this->writerThread.join(); + } ++ // write the remaining buffer ++ std::cout << "Closing file" << std::endl; + this->file.close(); + this->opened.store(false); + } + } + +- ~MSPTIHcclFileWriter() { this->stopWriter(); } ++ ~MSPTIHcclFileWriter() { ++ this->stopWriter(); ++ } + +- bool fileExists(const std::string &fp) +- { ++ bool fileExists(const std::string& fp) { + std::ifstream file(fp.c_str()); + return file.good() && file.is_open(); + } + +- void bufferMarkerActivity(msptiActivityMarker *activity) +- { ++ ++ void bufferMarkerActivity(msptiActivityMarker* activity) { + std::lock_guard lock(this->bufferMarkerMtx); + this->markerActivityBuffer->push_back(*activity); + } + +- void run() +- { ++ void run() { + // a thread to periodically flush + // the buffer to the file + // watch the conditional variable for signal +- this->writerThread = std::thread( +- [this]() +- { +- while (!this->stop.load()) +- { +- std::unique_lock lock(this->threadmtx); +- if (this->cv.wait_for(lock, std::chrono::seconds(5)) == +- std::cv_status::timeout) +- { +- this->hcclActivityFormatToJson(); +- } +- else if (this->stop.load()) +- { +- break; +- }; +- } +- }); ++ this->writerThread = std::thread([this](){ ++ while (!this->stop.load()) { ++ std::unique_lock lock(this->threadmtx); ++ if (this->cv.wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout){ ++ this->hcclActivityFormatToCSV(); ++ } else if (this->stop.load()) { ++ break; ++ }; ++ } ++ }); ++ ++ } ++ ++ void replaceCommasWithExclamation(const char* input, char* output) { ++ for (int i = 0; input[i] != '\0'; i++) { ++ if (input[i] == ',') { ++ output[i] = '!'; ++ } else { ++ output[i] = input[i]; ++ } ++ } ++ output[strlen(input)] = '\0'; + } + +- void hcclActivityFormatToJson() +- { ++ void hcclActivityFormatToCSV() { + std::lock_guard lock(this->buffermtx); +- if (this->file.is_open()) +- { +- for (auto activity : *this->markerActivityBuffer) +- { +- Json::Value markerJson; +- markerJson["Kind"] = activity.kind; +- markerJson["SourceKind"] = activity.sourceKind; +- markerJson["Timestamp"] = activity.timestamp; +- markerJson["Id"] = activity.id; +- markerJson["Flag"] = activity.flag; +- Json::Value msptiObjecId; +- if (activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) +- { +- Json::Value pt; +- pt["ProcessId"] = activity.objectId.pt.processId; +- pt["ThreadId"] = activity.objectId.pt.threadId; +- Json::Value ds; +- ds["DeviceId"] = activity.objectId.pt.processId; +- ds["StreamId"] = activity.objectId.pt.threadId; +- msptiObjecId["Pt"] = pt; +- msptiObjecId["Ds"] = ds; +- } +- else if (activity.sourceKind == +- MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) +- { +- Json::Value ds; +- ds["DeviceId"] = activity.objectId.ds.deviceId; +- ds["StreamId"] = activity.objectId.ds.streamId; +- Json::Value pt; +- pt["ProcessId"] = activity.objectId.ds.deviceId; +- pt["ThreadId"] = activity.objectId.ds.streamId; +- msptiObjecId["Pt"] = pt; +- msptiObjecId["Ds"] = ds; ++ if (this->file.is_open()) { ++ // enumerate the buffer and write to file ++ for (auto activity : *this->markerActivityBuffer) { ++ char result[strlen(activity.name) + 1]; ++ this->replaceCommasWithExclamation(activity.name, result); ++ // "Flag,Id,Kind,Name,SourceKind,Timestamp,msptiObjectId_Ds_DeviceId,msptiObjectId_Ds_StreamId,msptiObjectId_Pt_ProcessId,msptiObjectId_Pt_ThreadId"; ++ if (activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) { ++ this->file << activity.flag << "," << activity.id << "," << activity.kind << "," << result << "," << \ ++ activity.sourceKind<< "," << activity.timestamp << "," << activity.objectId.pt.processId << "," << activity.objectId.pt.threadId << "," << \ ++ activity.objectId.pt.processId << "," << activity.objectId.pt.threadId << std::endl; ++ }else if(activity.sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) { ++ this->file << activity.flag << "," << activity.id << "," << activity.kind << "," << result << "," << \ ++ activity.sourceKind<< "," << activity.timestamp << "," << activity.objectId.ds.deviceId << "," << activity.objectId.ds.streamId << "," << \ ++ activity.objectId.ds.deviceId << "," << activity.objectId.ds.streamId << std::endl; + } +- markerJson["msptiObjectId"] = msptiObjecId; +- markerJson["Name"] = activity.name; +- this->root.append(markerJson); +- } +- if (this->root.size() > 0) +- { +- Json::StyledWriter writer; +- this->file << writer.write(this->root); +- this->root.clear(); + } + this->markerActivityBuffer->clear(); +- } +- else +- { ++ } else { + std::cout << "File is not open" << std::endl; + } + } +-}; +\ No newline at end of file ++}; +diff --git a/sysTrace/src/mspti/mspti_tracker.cpp b/sysTrace/src/mspti/mspti_tracker.cpp +index d6f1285..dd0f06e 100644 +--- a/sysTrace/src/mspti/mspti_tracker.cpp ++++ b/sysTrace/src/mspti/mspti_tracker.cpp +@@ -19,7 +19,7 @@ MSPTITracker::MSPTITracker() + { + std::cout << "Logging initialized from preloaded library." << std::endl; + hcclFileWriter = +- std::make_unique("hccl_activity.json"); ++ std::make_unique("hccl_activity.csv"); + msptiSubscribe(&subscriber, nullptr, nullptr); + msptiActivityRegisterCallbacks(UserBufferRequest, UserBufferComplete); + msptiActivityEnable(MSPTI_ACTIVITY_KIND_MARKER); +@@ -60,11 +60,11 @@ void MSPTITracker::UserBufferRequest(uint8_t **buffer, size_t *size, + { + auto &instance = getInstance(); + std::lock_guard lock(mtx); +- constexpr uint32_t SIZE = (uint32_t)MB * 1; ++ constexpr uint32_t SIZE = (uint32_t)MB * 10; + instance.requestedCount.fetch_add(1); + uint8_t *pBuffer = (uint8_t *)malloc(SIZE + ALIGN_SIZE); + *buffer = align_buffer(pBuffer, ALIGN_SIZE); +- *size = MB * 1; ++ *size = MB * 10; + *maxNumRecords = 0; + } + +-- +2.33.0 + diff --git a/sysTrace.spec b/sysTrace.spec index c1d61be255bdf0b9ff15c498e5e80190d206561b..674b62ba2ce40f82cd616408fd3982a32d5e44f6 100644 --- a/sysTrace.spec +++ b/sysTrace.spec @@ -2,7 +2,7 @@ %global _enable_debug_packages 0 Name: sysTrace Version: 1.0 -Release: 3%{?dist} +Release: 4%{?dist} Summary: System Tracing Library with Fail Slow Detection License: GPLv2 AND MulanPSL2 URL: https://gitee.com/src-openeuler/sysTrace @@ -14,7 +14,7 @@ BuildRequires: boost-devel abseil-cpp-devel openssl-devel BuildRequires: protobuf-devel protobuf-compiler jsoncpp-devel protobuf-c protobuf-c-devel Patch0001: fix-typo-error-in-mspti.patch - +Patch0002: 0001-fix-lack-of-data-by-replacing-json-to-csv-and-detect.patch %description sysTrace is a system tracing library that provides low-level system tracing capabilities for debugging and performance analysis, bundled @@ -86,6 +86,9 @@ getent passwd systrace >/dev/null || \ %{_unitdir}/systrace-failslow.service %changelog +* Wed Jun 4 2025 liwei - 1.0-4 +- fix typo in mspti + * Thu May 22 2025 liwei <1289113577@qq.com> - 1.0-3 - fix typo in mspti