diff --git a/failslow/failslow/process/__init__.py b/failslow/failslow/process/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9402b9c4fb2581383c934827e4cf8369af960d99 --- /dev/null +++ b/failslow/failslow/process/__init__.py @@ -0,0 +1,18 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: c00570162/congdechun +Create Date: 2025/5/9 14:24 +Notes: + +""" +from failslow.process.convert_mspti_timeline import convert_mspti_timeline + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/failslow/failslow/process/convert_json2csv.py b/failslow/failslow/process/convert_json2csv.py new file mode 100644 index 0000000000000000000000000000000000000000..42c8b3c296f7e4b96001e2c661d951b19dbe8eb8 --- /dev/null +++ b/failslow/failslow/process/convert_json2csv.py @@ -0,0 +1,60 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:convert_json2_csv.py +Author: c00570162/congdechun +Create Date: 2025/3/28 16:17 +Notes: + +""" +import os +import json +import pandas as pd +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +def convert_json2csv(json_path): + csv_path = f"{json_path[:-5]}.csv" + if os.path.exists(csv_path): + return + + try: + with open(json_path, 'r', encoding='utf-8') as file: + content = file.read() + content = content.replace(']\n[', ',').strip() + json_data = json.loads(content) + except: + logger.error("json data read error") + json_data = None + + if not json_data: + return + df = pd.json_normalize(json_data, sep='_') + + logger.info(f"save path: {csv_path}") + df.to_csv(csv_path, index=False) + + +def convert_jsons2csv(root_path): + if not os.path.exists(root_path): + return + json_files = [file for file in os.listdir(root_path) if file.endswith("json")] + + for json_file in json_files: + if "hccl_activity" not in json_file: + continue + logger.info(f"{json_file}") + + json_path = os.path.join(root_path, json_file) + convert_json2csv(json_path) + + +if __name__ == "__main__": + # json_path = "./data/json_data/hccl_activity.3.json" + # convert_json2csv(json_path) + + root_path = "./data/json_tp4dp1" + convert_jsons2csv(root_path) diff --git a/failslow/failslow/process/convert_mspti_timeline.py b/failslow/failslow/process/convert_mspti_timeline.py new file mode 100644 index 0000000000000000000000000000000000000000..7b1eb7ca2bb910f79f99ddc2865a5e009631399c --- /dev/null +++ b/failslow/failslow/process/convert_mspti_timeline.py @@ -0,0 +1,139 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:slow_node_detection.py +Author: c00570162/congdechun +Create Date: 2025/3/26 11:23 +Notes: + +""" +import os +import json +import pandas as pd +from failslow.process.convert_json2csv import convert_jsons2csv + +__all__ = ['convert_mspti_timeline'] + +MODE = { + 0: "Host", + 1: "Device" +} +OP_COLORS = { + 'HcclAllreduce': "good", + 'HcclAllReduce': "good", + 'HcclAllGather': "bad", + 'HcclBroadcast': "yellow", + 'HcclReduceScatter': "olive", + 'HcclSend': "good", + 'HcclReceive': "good", + 'HcclBatchSendRecv': "thread_state_runnable" +} + + +def create_args(row): + return { + "id": row["Id"], + "comm_group": row["comm_group"], + "count": row["count"] + } + + +def split_df(df): + """ + 根据 mode 列将 DataFrame 拆分为 host 和 device 两个 DataFrame + """ + df_host = df[df['SourceKind'] == 0] + df_device = df[df['SourceKind'] == 1] + return df_host, df_device + + +def process_df(data_df, device_id, id2name_dict: dict): + """ + 对 DataFrame 进行处理,包括分组聚合、列拆分、添加新列等操作 + """ + + data_df["Name"] = data_df['Id'].map(id2name_dict) + df = data_df.groupby('Id').agg({ + 'Timestamp': ['min', 'max'], + 'Kind': 'first', + 'SourceKind': 'first', + 'Name': 'first', + }).reset_index() + df.columns = ['Id', 'start', 'end', 'Kind', 'SourceKind', 'Name'] + if len(df): + if "!" in df["Name"].iloc[0]: + df[['comm_op', 'comm_group', 'data_type', 'count']] = df['Name'].str.replace('comm:', '').str.split('!', + expand=True) + else: + df[['comm_op', 'comm_group', 'data_type', 'count']] = df['Name'].str.replace('comm:', '').str.split(',', + expand=True) + df = df.drop(columns=['Name']) + df['cat'] = "hccl" + df['name'] = df['comm_op'] + df['cname'] = df['comm_op'].map(OP_COLORS) + df['end'] = df['end'] / 1000. + df['start'] = df['start'] / 1000. + df['dur'] = df['end'] - df['start'] + df['ph'] = "X" + df['pid'] = f"rank_{device_id}" + df['tid'] = df["SourceKind"].map(MODE) + df['args'] = df.apply(create_args, axis=1) + result = df[['cat', 'name', 'ph', 'pid', 'tid', 'start', 'dur', 'cname', 'args']].rename( + columns={'start': 'ts'}).to_dict(orient='records') + return result + + +def process_files(root_path, debug: bool = False): + """ + 处理指定路径下的所有 CSV 文件 + """ + csv_files = [file for file in os.listdir(root_path) if file.endswith("csv") and "device" not in file] + all_ranks = [] + for csv_file in csv_files: + if "op_launch" in csv_file: + continue + print(f"start file: {csv_file}") + csv_file_path = os.path.join(root_path, csv_file) + df = pd.read_csv(csv_file_path) + if debug: + df = df.head(12) + + id2name_dict = df[df['Name'].notna()].set_index('Id')['Name'].to_dict() + # df['name'] = df.groupby('id')['name'].transform(lambda x: x.ffill().bfill()) + df_host, df_device = split_df(df) + device_id = df_device['msptiObjectId_Ds_DeviceId'].unique()[0] + host_result = process_df(df_host, device_id, id2name_dict) + all_ranks.extend(host_result) + device_result = process_df(df_device, device_id, id2name_dict) + all_ranks.extend(device_result) + return all_ranks + + +def save_to_json(all_ranks, files_path): + """ + 将处理结果保存为 JSON 文件 + """ + output = { + "traceEvents": all_ranks, + "stackFrames": {} + } + json_output = json.dumps(output, indent=4) + with open(os.path.join(files_path, f'mspti_comm_ops_timeline.json'), 'w') as f: + f.write(json_output) + + +def convert_mspti_timeline(data_path: str): + ''' + @return: + @params: + data_path: mspti采集数据的路径 + ''' + convert_jsons2csv(data_path) + all_ranks = process_files(data_path) + save_to_json(all_ranks, data_path) + + +if __name__ == "__main__": + files_path = "./data/cal_op_0506" + convert_mspti_timeline(files_path) diff --git a/failslow/failslow/process/post_process.py b/failslow/failslow/process/post_process.py new file mode 100644 index 0000000000000000000000000000000000000000..336417d1cdd1397fc583e3a21e49bb0c4a22dfd9 --- /dev/null +++ b/failslow/failslow/process/post_process.py @@ -0,0 +1,217 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:post_process.py +Author: c00570162/congdechun +Create Date: 2025/3/8 14:52 +Notes: + +""" +import traceback +import numpy as np +from datetime import datetime, timezone +from typing import Dict, Tuple, List +from collections import Counter + +from failslow.response import AIJobDetectResult, NodeData, ResultCode +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +class PostProcess(): + def __init__(self, metric_args, model_args, node_ip2ranks): + self.metric_args = metric_args + self.model_args = model_args + self.max_num_normal_results = self.model_args.get("max_num_normal_results", 10) + self.record_kpi_value = self.model_args.get("record_kpi", False) + self.node_ip2ranks = node_ip2ranks + + def gen_final_alarm(self, detect_results: List): + response = AIJobDetectResult() + response.timestamp = int(datetime.now(timezone.utc).astimezone().astimezone().timestamp()) + all_anomaly_nodes = [] + + for index, result in enumerate(detect_results): + try: + aomaly_devices = result.get("anomaly_devices") + all_anomaly_nodes.extend(aomaly_devices) + self.group_detect_ret_agg(response, result) + except Exception: + logger.error(traceback.format_exc()) + logger.info("Group accomplishment: %s/%s", index + 1, len(detect_results)) + + return response, all_anomaly_nodes + def group_detect_ret_agg(self, response: AIJobDetectResult, detect_result: Dict): + anomaly_device_labels = detect_result.get("anomaly_devices", []) + if not anomaly_device_labels: + return + + response.result_code = ResultCode.anomaly + metric_name = detect_result["metric_name"] + kpi_params = self.metric_args.get(metric_name, {}) + response(kpi_params.get('type', "compute")) + + keep_devices, omitted_devices = self._determine_keep_omitted_devices(detect_result, anomaly_device_labels, + metric_name) + for device_label in anomaly_device_labels: + abnormal_node_data = self._process_abnormal_device(detect_result, device_label, keep_devices, + omitted_devices, metric_name) + response.abnormal_detail.append(abnormal_node_data) + + self._add_normal_devices(response, detect_result, keep_devices, metric_name) + + def _determine_keep_omitted_devices(self, detect_result: Dict, anomaly_device_labels: List, metric_name: str) -> ( + List, List): + keep_devices, omitted_devices = [], [] + for device_label in anomaly_device_labels: + method_type = detect_result.get("detect_result_type", {}).get(device_label, {}).get(metric_name, "TIME") + if method_type == "SPACE": + normal_devices = sorted(set(detect_result["group_data"].keys()) - set(anomaly_device_labels)) + keep_devices = normal_devices[:self.max_num_normal_results] + omitted_devices = normal_devices[self.max_num_normal_results:] + break # Assuming only one SPACE type is considered for simplicity. + return keep_devices, omitted_devices + + def get_node_id_by_rank(self, rank): + node_id = "localhost" + for tmp_node_ip, ranks in self.node_ip2ranks.items(): + if rank in ranks: + node_id = tmp_node_ip + break + + return node_id + + def _process_abnormal_device(self, detect_result: Dict, device_label: str, keep_devices: List, + omitted_devices: List, metric_name: str) -> NodeData: + method_type = detect_result["detect_result_type"][device_label].get(metric_name, "TIME") + time_stamp_data, values = detect_result["anomaly_locations"][device_label][metric_name] + label_dict = dict(zip(time_stamp_data.tolist(), values.tolist())) + node_ip = self.get_node_id_by_rank(device_label) + abnormal_node_data = NodeData(metric_name, device_label, method_type, node_ip, keep_devices, omitted_devices) + + if self.record_kpi_value: + g_ts, g_value = detect_result["group_data"][device_label].values[:, 0], detect_result["group_data"][ + device_label].values[:, 1] + kpi_data = [{str(key): str(value), "abnormal": label_dict.get(key, 0)} for key, value in + sorted(zip(g_ts.tolist(), g_value.tolist()), key=lambda x: x[0])] + abnormal_node_data.kpi_data = kpi_data + + return abnormal_node_data + + def _add_normal_devices(self, response: AIJobDetectResult, detect_result: Dict, keep_devices: List, + metric_name: str): + if keep_devices: + for device_label in keep_devices: + node_ip = self.get_node_id_by_rank(device_label) + normal_node_data = NodeData(metric_name, device_label, "SPACE", node_ip) + if self.record_kpi_value: + g_ts, g_value = detect_result["group_data"][device_label].values[:, 0], detect_result["group_data"][ + device_label].values[:, + 1] + kpi_data = [{str(key): str(value)} for key, value in zip(g_ts.tolist(), g_value.tolist())] + normal_node_data.kpi_data = kpi_data + response.normal_detail.append(normal_node_data) + + def process_comm_slow_result(self, slow_results: list, slow_group: list, group_ranks: list) -> list: + ''' + target: find most anomaly rank in pp group. + step1: Count the number of occurrences of each abnormal node and calculate the score. + step2: Traverse the ranks greater than or equal to 0.5. + If the score of an adjacent node is also greater than or equal to 0.5, then this node is considered an abnormal node. + step3:If the scores of adjacent nodes all meet the condition in step 2, + it is necessary to check whether there are any issues with the TP or DP communication groups they belong to. + If there are problems, locate the relevant nodes. + ''' + anomaly_scores = self.calculate_anomaly_scores(slow_results, group_ranks) + most_anomaly_ranks = self.find_most_anomaly_ranks(anomaly_scores, group_ranks, slow_group) + detection_results = self.merge_slow_results(slow_results, most_anomaly_ranks) + + return [detection_results] + + def calculate_anomaly_scores(self, slow_results: list, group_ranks: list) -> dict: + anomaly_ranks = [rank for slow_result in slow_results for rank in slow_result["anomaly_devices"]] + anomaly_ranks_dict = Counter(anomaly_ranks) + detected_times = len(slow_results) + + return {rank: anomaly_ranks_dict.get(rank, 0) / detected_times for rank in group_ranks} + + def find_most_anomaly_ranks(self, anomaly_scores: dict, group_ranks: list, slow_groups: list) -> list: + most_anomaly_ranks = [] + for index, rank in enumerate(group_ranks): + if anomaly_scores[rank] == 1.: + if index > 0 and anomaly_scores[group_ranks[index - 1]] >= 0.5: + most_anomaly_ranks.append(rank) + if index < len(group_ranks) - 1 and anomaly_scores[group_ranks[index + 1]] >= 0.5: + most_anomaly_ranks.append(rank) + + filter_anomaly_ranks = [] + slow_groups = ['[6,7]'] + for rank in most_anomaly_ranks: + for slow_group in slow_groups: + if rank in eval(slow_group): + filter_anomaly_ranks.append(rank) + break + if filter_anomaly_ranks: + most_anomaly_ranks = filter_anomaly_ranks + + if len(group_ranks) == 2 and len(most_anomaly_ranks) == 2: + filter_anomaly_ranks = [] + for rank in most_anomaly_ranks: + for slow_group in anomaly_scores: + if rank in eval(slow_group): + filter_anomaly_ranks.append(rank) + break + if filter_anomaly_ranks: + most_anomaly_ranks = filter_anomaly_ranks + + return most_anomaly_ranks + + def merge_slow_results(self, slow_results: list, most_anomaly_ranks: list) -> dict: + if len(slow_results) <= 1: + return {} + + metric_name = slow_results[0].get("metric_name", "no_metric").split("!")[0] + merged_anomaly_locations, merged_anomaly_type = {}, {} + + for slow_result in slow_results: + self._merge_anomaly_locations(merged_anomaly_locations, slow_result['anomaly_locations']) + self._merge_anomaly_types(merged_anomaly_type, slow_result['detect_result_type']) + + return { + "metric_name": metric_name, + "anomaly_devices": most_anomaly_ranks, + "group_data": slow_results[0]["group_data"], + "anomaly_locations": merged_anomaly_locations, + "detect_result_type": merged_anomaly_type + } + + def _merge_anomaly_locations(self, merged_anomaly_locations: dict, anomaly_locations: dict): + for rank, locations in anomaly_locations.items(): + for metric_name_with_aggre, timestamp_with_label in locations.items(): + raw_metric_name = metric_name_with_aggre.split("!")[0] + if rank not in merged_anomaly_locations: + merged_anomaly_locations[rank] = {raw_metric_name: timestamp_with_label} + else: + if raw_metric_name not in merged_anomaly_locations[rank]: + merged_anomaly_locations[rank][raw_metric_name] = timestamp_with_label + else: + tmp_value = merged_anomaly_locations[rank][raw_metric_name][1] + timestamp_with_label[1] + merged_anomaly_locations[rank][raw_metric_name] = ( + merged_anomaly_locations[rank][raw_metric_name][0], + tmp_value.astype(np.bool).astype(np.float32) + ) + + def _merge_anomaly_types(self, merged_anomaly_type: dict, group_result_type: dict): + for rank, result_type in group_result_type.items(): + for metric_name_with_aggre, detect_type in result_type.items(): + raw_metric_name = metric_name_with_aggre.split("!")[0] + if rank not in merged_anomaly_type: + merged_anomaly_type[rank] = {raw_metric_name: detect_type} + elif raw_metric_name not in merged_anomaly_type[rank]: + merged_anomaly_type[rank][raw_metric_name] = detect_type + + +if __name__ == "__main__": + pass diff --git a/failslow/failslow/process/suppression.py b/failslow/failslow/process/suppression.py new file mode 100644 index 0000000000000000000000000000000000000000..fc484c59f899ebecdf4a2daa3142b31946b0e8c6 --- /dev/null +++ b/failslow/failslow/process/suppression.py @@ -0,0 +1,100 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:suppression.py +Author: c00570162/congdechun +Create Date: 2025/4/19 15:38 +Notes: + +""" + +from datetime import datetime, timedelta +from typing import List, Tuple, Dict + +from anteater.core.anomaly import Anomaly +from anteater.utils.datetime import DateTimeManager as dt +from anteater.utils.log import logger + + +class AnomalySuppression: + """The reported anomaly events suppression + + The same type of anomaly events will be reported multiple times, + when an abnormal system status sustained a long time period. + The AnomalySuppression aims to reduce the num of identical anomaly events. + """ + + def __init__(self, supression_time: int = 5) -> None: + """The Anomaly Suppression class initializer""" + self.look_back = supression_time # time to backtrace (minutes) + self.max_len = 10000 # max length of the queue + self.ab_queue: List[Tuple[datetime, Anomaly]] = [] + + @property + def ab_machine_ids(self) -> List[str]: + """Gets recent anomalies' machine ids""" + return [a.machine_id for t, a in self.ab_queue] + + @property + def ab_metrics(self) -> List[str]: + """Gets recent anomalies metrics""" + return [a.metric for t, a in self.ab_queue] + + @property + def ab_labels(self) -> List[Dict]: + """Gets recent anomalies labels""" + return [a.labels for t, a in self.ab_queue] + + def suppress(self, anomaly: Anomaly) -> bool: + """Suppresses reported anomalies if there is a same one recently""" + if self._check_same_type(anomaly): + logger.info('An anomaly was be supressed by AnomalySuppression!') + return True + + self._append(anomaly) + return False + + def _update(self) -> None: + """Updates recent anomalies""" + if not self.ab_queue: + return + + timestamp = dt.utc_now() + tmp_ts = timestamp - timedelta(minutes=self.look_back) + self.ab_queue = [x for x in self.ab_queue if x[0] >= tmp_ts] + + def _append(self, anomaly: Anomaly) -> None: + """Appends anomaly to the queue""" + if len(self.ab_labels) > self.max_len: + self.ab_labels.pop(0) + + timestamp = dt.utc_now() + self.ab_queue.append((timestamp, anomaly)) + + def _check_same_type(self, anomaly: Anomaly) -> bool: + """Checks there are the same machine id, metric, labels + and descriptions in historical anomalies + """ + if not anomaly: + return False + + self._update() + machine_id = anomaly.machine_id + metric = anomaly.metric + labels = anomaly.labels + details = anomaly.details + + filtered_queue = [] + for _, x in self.ab_queue: + if machine_id == x.machine_id and \ + metric == x.metric and \ + labels == x.labels and \ + details == x.details: + filtered_queue.append(x) + + if filtered_queue: + return True + + return False + diff --git a/failslow/failslow/tests/__init__.py b/failslow/failslow/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..065cc5d1e21259e00cf78fd23df305ebdb02bedc --- /dev/null +++ b/failslow/failslow/tests/__init__.py @@ -0,0 +1,18 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: c00570162/congdechun +Create Date: 2025/5/9 14:43 +Notes: + +""" + + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/failslow/failslow/tests/comm_slow.py b/failslow/failslow/tests/comm_slow.py new file mode 100644 index 0000000000000000000000000000000000000000..bc3993aac7d5c308c72350615b9b701bdc820cc3 --- /dev/null +++ b/failslow/failslow/tests/comm_slow.py @@ -0,0 +1,33 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:comm_slow.py +Author: c00570162/congdechun +Create Date: 2025/3/8 15:44 +Notes: + +""" +from typing import List + +def detect_most_anomaly_node(self, data_loader, anomaly_devices, get_rank2group_map, comm_slow_results: List): + choose_group_type = "PP" + topo_data = data_loader.topo_data + rank_id2groups = get_rank2group_map(topo_data) + anomaly_groups = set(rank_id2groups[choose_group_type][item] for item in anomaly_devices) + final_result = {"is_bounded_to_card": False, "bounded_results": {}} + for anomaly_group in anomaly_groups: + # [(0,1), (1,2), (3,4)] + detect_res = [1 if rankid in anomaly_devices else 0 for rankid in anomaly_group] + scores = [detect_res[i - 1 if i - 1 >= 0 else None:i + 1] for i in range(len(detect_res) - 1)] + scores = [sum(score) / len(score) for score in scores] + result = [] + for i in range(len(scores) - 1): + if scores[i] == 0.5 and scores[i + 1] == 1: + result.append(anomaly_group[i + 1]) + elif scores[i] == 1 and scores[i + 1] == 0.5: + result.append(anomaly_group[i]) + if result: + final_result["is_bounded_to_card"] = True + final_result["bounded_results"].setdefault(choose_group_type + str(anomaly_group), result) + return final_result diff --git a/failslow/failslow/tests/fft.py b/failslow/failslow/tests/fft.py new file mode 100644 index 0000000000000000000000000000000000000000..a528c9f53346527652d5d31de38de06e5f36e062 --- /dev/null +++ b/failslow/failslow/tests/fft.py @@ -0,0 +1,36 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:fft.py +Author: c00570162/congdechun +Create Date: 2025/3/7 19:28 +Notes: + +""" +import numpy as np +import matplotlib.pyplot as plt + +def calculate_period(data): + # 进行傅里叶变换 + fft_values = np.fft.fft(data) + frequencies = np.fft.fftfreq(len(data), t[1] - t[0]) + + # 取绝对值并找到主要频率 + abs_fft = np.abs(fft_values) + positive_frequencies = frequencies[:len(frequencies) // 2] + positive_abs_fft = abs_fft[:len(abs_fft) // 2] + main_frequency_index = np.argmax(positive_abs_fft) + main_frequency = positive_frequencies[main_frequency_index] + + # 估计周期 + estimated_period = 1 / main_frequency if main_frequency != 0 else None + print(f"Estimated period: {estimated_period}") + + # 绘制频域图 + plt.figure(figsize=(10, 6)) + plt.plot(positive_frequencies, positive_abs_fft) + plt.xlabel('Frequency') + plt.ylabel('Amplitude') + plt.title('Frequency Spectrum') + plt.show() \ No newline at end of file diff --git a/failslow/failslow/tests/kmeans_cluster.py b/failslow/failslow/tests/kmeans_cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..1c2b4a14155bd21c36be51c395d15e692c8d860c --- /dev/null +++ b/failslow/failslow/tests/kmeans_cluster.py @@ -0,0 +1,49 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:kmeans_cluster.py +Author: c00570162/congdechun +Create Date: 2025/3/7 14:15 +Notes: + +""" + + +import numpy as np +import matplotlib.pyplot as plt +from sklearn.cluster import KMeans + +# 生成示例时序数据 +np.random.seed(42) +n_samples = 100 +n_time_steps = 20 + +# 生成三种不同分布的时序数据 +data_cluster_1 = np.random.normal(loc=0, scale=1, size=(n_samples // 3, n_time_steps)) +data_cluster_2 = np.random.normal(loc=5, scale=2, size=(n_samples // 3, n_time_steps)) +data_cluster_3 = np.random.normal(loc=-5, scale=1.5, size=(n_samples - 2 * (n_samples // 3), n_time_steps)) + +# 合并数据 +data = np.vstack((data_cluster_1, data_cluster_2, data_cluster_3)) + +# 提取特征(均值和标准差) +features = np.hstack((np.mean(data, axis=1).reshape(-1, 1), np.std(data, axis=1).reshape(-1, 1))) + +# 使用 K - 均值聚类进行分类 +n_clusters = 3 +kmeans = KMeans(n_clusters=n_clusters, random_state=42) +labels = kmeans.fit_predict(features) + +# 可视化分类结果 +plt.figure(figsize=(12, 6)) +colors = ['r', 'g', 'b'] +for i in range(n_clusters): + cluster_data = data[labels == i] + for j in range(cluster_data.shape[0]): + plt.plot(cluster_data[j], color=colors[i], alpha=0.5) + +plt.title('Classification of Time Series Data') +plt.xlabel('Time Step') +plt.ylabel('Value') +plt.show() \ No newline at end of file diff --git a/failslow/failslow/tests/tmp.py b/failslow/failslow/tests/tmp.py new file mode 100644 index 0000000000000000000000000000000000000000..b050f9c93b5e280bdac64f52b5168c6a260f882f --- /dev/null +++ b/failslow/failslow/tests/tmp.py @@ -0,0 +1,30 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:tmp.py +Author: c00570162/congdechun +Create Date: 2025/3/7 17:03 +Notes: + +""" +from collections import Counter + +def main(): + pass + + +if __name__ == "__main__": + abc = '[6,7]' + abc = eval(abc) + print(abc, type(abc)) + aggerated_data_dfs = [1,2,3] + new_detect_group_data = [{} for _ in range(len(aggerated_data_dfs))] + new_detect_group_data[0]["abc"] = "abc" + print(new_detect_group_data) + group_ranks = [1,2,3,1] + anomaly_scores = [0] * len(group_ranks) + anomaly_scores[1] = 2 + print(anomaly_scores) + g_ranks_dict = Counter(group_ranks) + print(g_ranks_dict) \ No newline at end of file