diff --git a/failslow/README.md b/failslow/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f770d518bc58dd5d162b38bc6cfcef9766f5d5b5 --- /dev/null +++ b/failslow/README.md @@ -0,0 +1,42 @@ +# README + +`LD_PRELOAD=./build/libmspti_tracker.so python xxx` + + + +# 调优记录 +1 250506调优记录: +数据尺度在5-8ms,调整dbscann的eps +eps: 0.4 -> 0.04 +window_size: 100 -> 20 +## 0510 通信慢检测对象, +**检测规则**: +1 一个通信组内会有batchsendrecv, 目前考虑按照数据特征分开,再分别检测, +2 在收尾的慢通信对需要结合其他通信域的检测结果; + +## todo +1 算子下发检测算法优化 +2 所有算子的检测配置文件更改 +3 调测出rpm包 + +1 package name: sysTrace-failslow + +# 安装部署 +## 前置条件 +支持的python版本:3.7+; +failslow 依赖于 systrace 采集的数据通信算子数据,请先完成 训练任务的 通信算子采集; +failslow 直接从本地目录读取通信算子数据,需要在配置文件中指定通信算子数据的路径 + +## 从本仓库源码安装运行(适用于开发者) +### 下载源码 + git clone https://gitee.com/openeuler/sysTrace.git +### 安装 +工程./systrace目录下执行下面命令: +python3 setup.py install +### 运行 +systrace-failslow + +### 数据分析 +**算子执行**:3ms左右,计算慢导致的异常时7-8ms +**算子下发**: 表示算子下发到算子开始执行的时间 600ms左右 +**通信慢**: sendrecv:几十ms到1200ms \ No newline at end of file diff --git a/failslow/config/metric_config.json b/failslow/config/metric_config.json new file mode 100644 index 0000000000000000000000000000000000000000..00d093b6542fdd8881db89a163d3ac0fe46dfb84 --- /dev/null +++ b/failslow/config/metric_config.json @@ -0,0 +1,212 @@ +{ + "HcclAllGather": { + "metric_type": "device", + "aggregation": { + "during_s": 3, + "funcs": [ + { + "func": "percentile", + "func_params": { + "q": 90 + } + } + ] + }, + "priority": 20, + "alarm_filter_window_size": 5, + "space_detector": { + "dist_metric": "euclidean", + "eps": 0.4, + "cv_threshold": 0.03, + "min_samples": 2, + "window_size": 5, + "scaling": false, + "type": "SlidingWindowDBSCAN" + }, + "time_detector": { + "alarm_filter_window_size": 5, + "preprocess_eps": 0.1, + "preprocess_min_samples": 10, + "type": "SlidingWindowKSigmaDetector", + "n_sigma_method": { + "type": "SlidingWindowNSigma", + "training_window_size": 20, + "min_update_window_size": 10, + "min_std_val": 0.0001, + "bias": 0.1, + "abs_bias": null, + "nsigma_coefficient": 4, + "detect_type": "upper_bound", + "min_expert_lower_bound": null, + "max_expert_lower_bound": null, + "min_expert_upper_bound": null, + "max_expert_upper_bound": null + } + }, + "type": "compute" + }, + "HcclAllGather_launch": { + "metric_type": "host", + "aggregation": { + "during_s": 3, + "funcs": [ + { + "func": "percentile", + "func_params": { + "q": 90 + } + } + ] + }, + "priority": 20, + "alarm_filter_window_size": 5, + "space_detector": { + "dist_metric": "euclidean", + "eps": 3.0, + "cv_threshold": 0.03, + "min_samples": 2, + "window_size": 20, + "scaling": true, + "type": "SlidingWindowDBSCAN" + }, + "time_detector": { + "alarm_filter_window_size": 5, + "preprocess_eps": 0.1, + "preprocess_min_samples": 10, + "type": "SlidingWindowKSigmaDetector", + "n_sigma_method": { + "type": "SlidingWindowNSigma", + "training_window_size": 20, + "min_update_window_size": 10, + "min_std_val": 0.0001, + "bias": 0.1, + "abs_bias": 5, + "nsigma_coefficient": 4, + "detect_type": "upper_bound", + "min_expert_lower_bound": null, + "max_expert_lower_bound": null, + "min_expert_upper_bound": null, + "max_expert_upper_bound": null + } + }, + "type": "compute" + }, + "HcclReduceScatter": { + "metric_type": "device", + "aggregation": { + "during_s": 5, + "funcs": [ + { + "func": "percentile", + "func_params": { + "q": 90 + } + } + ] + }, + "priority": 20, + "alarm_filter_window_size": 5, + "space_detector": { + "dist_metric": "euclidean", + "eps": 0.4, + "cv_threshold": 0.03, + "min_samples": 2, + "window_size": 100, + "scaling": false, + "type": "SlidingWindowDBSCAN" + }, + "time_detector": { + "alarm_filter_window_size": 5, + "preprocess_eps": 0.1, + "preprocess_min_samples": 10, + "type": "SlidingWindowKSigmaDetector", + "n_sigma_method": { + "type": "SlidingWindowNSigma", + "training_window_size": 40, + "min_update_window_size": 10, + "min_std_val": 0.0001, + "bias": 0.1, + "abs_bias": 5, + "nsigma_coefficient": 4, + "detect_type": "upper_bound", + "min_expert_lower_bound": null, + "max_expert_lower_bound": null, + "min_expert_upper_bound": 50, + "max_expert_upper_bound": null + } + }, + "type": "compute" + }, + "HcclBatchSendRecv": { + "metric_type": "device", + "aggregation": { + "during_s": 5, + "funcs": [ + { + "func": "percentile", + "func_params": { + "q": 90 + } + }, + { + "func": "percentile", + "func_params": { + "q": 10 + } + } + ] + }, + "priority": 20, + "alarm_filter_window_size": 5, + "space_detector": null, + "time_detector": { + "alarm_filter_window_size": 5, + "preprocess_eps": 0.1, + "preprocess_min_samples": 10, + "type": "SlidingWindowKSigmaDetector", + "n_sigma_method": { + "type": "SlidingWindowNSigma", + "training_window_size": 20, + "min_update_window_size": 10, + "min_std_val": 0.0001, + "bias": 0.1, + "abs_bias": null, + "nsigma_coefficient": 4, + "detect_type": "upper_bound", + "min_expert_lower_bound": null, + "max_expert_lower_bound": null, + "min_expert_upper_bound": null, + "max_expert_upper_bound": null + } + }, + "type": "network" + }, + "HcclAllReduce": { + "metric_type": "device", + "method": "sum", + "priority": 20, + "alarm_filter_window_size": 5, + "space_detector": null, + "time_detector": { + "alarm_filter_window_size": 5, + "preprocess_eps": 0.1, + "preprocess_min_samples": 10, + "type": "SlidingWindowKSigmaDetector", + "n_sigma_method": { + "type": "SlidingWindowNSigma", + "training_window_size": 20, + "min_update_window_size": 10, + "min_std_val": 0.0001, + "bias": 0.1, + "abs_bias": null, + "nsigma_coefficient": 4, + "detect_type": "upper_bound", + "min_expert_lower_bound": null, + "max_expert_lower_bound": null, + "min_expert_upper_bound": 50, + "max_expert_upper_bound": null + } + }, + "type": "network" + } +} \ No newline at end of file diff --git a/failslow/config/model_config.json b/failslow/config/model_config.json new file mode 100644 index 0000000000000000000000000000000000000000..fe1bc2c41ae6479b474898c6bf14b3f59e616bec --- /dev/null +++ b/failslow/config/model_config.json @@ -0,0 +1,40 @@ +{ + "with_fail_slow": true, + "task_stable_step": 3, + "min_startup_detection_steps": 10, + "fail_slow_span_mins": 10, + "training_log": "/etc/systrace/data/rank0_mindformer.log", + "step_data_path": "/etc/systrace/data/step_data.csv", + "fail_slow_perception_path": "/etc/systrace/result/fail_slow", + "steps_window_size": 5, + "k_sigma": 2, + "anomaly_degree_thr": 0.2, + "slow_node_detection_range_times": [], + "slow_node_detection_time_span_hours": 0.5, + "slow_node_detection_path": "/etc/systrace/result/slow_node", + "data_type": "json", + "root_path": "/home/hbdir/systrace_failslow/data/cal_slow_rank2", + "enable_detect_type": { + "enable_cal": true, + "enable_op_launch": false, + "enable_comm": false, + "enable_dataloader": false, + "enable_ckpt": false + }, + "fail_slow_ops":{ + "cal_slow": "HcclAllGather", + "op_launch_slow": "HcclAllGather_launch", + "comm_slow": "HcclBatchSendRecv", + "dataloader_slow": "Dataloader", + "ckpt_slow": "SaveCkpt" + }, + "save_image": "image", + "record_kpi": false, + "use_plot": false, + "max_num_normal_results": 16, + "look_back": 20, + "hccl_domain": { + }, + "rank_table_json": "./rank_table.json", + "debug_data": false +} \ No newline at end of file diff --git a/failslow/docs/conf_introduction.md b/failslow/docs/conf_introduction.md new file mode 100644 index 0000000000000000000000000000000000000000..50053fe996c7f7aed4b1282ed54dda881d864a71 --- /dev/null +++ b/failslow/docs/conf_introduction.md @@ -0,0 +1,106 @@ +# 配置文件介绍 + +systrace-failslow运行的参数,主要通过**model_config.json**和**metic_config.json**配置,其中前者配置代码运行相关的参数,后者配置算法配置相关的参数。 + + +全部配置文件归档在[config](https://gitee.com/openeuler/sysTrace/tree/master/config)目录。 + + +## 配置文件目录结构 + +启动配置文件目录结构如下,主要分为两类:`启动参数配置`和`日志参数配置`。 + +``` +systrace-failslow # systrace-failslow 主目录 +└─ config # 配置文件目录 + ├─ model_config.json # 模型参数配置 + ├─ metric_config.json # 算法参数配置 +``` + +## 模型参数配置 + +在文件model_config.json中,配置模型运行所需的参数。该配置项中,主要包含: + +- with_fail_slow: 配置启动慢节点检测性能劣化来源于性能劣化检测的时刻还是手动配置, 默认为false + + + +- task_stable_step: 训练任务稳定训练的step索引,默认值为3 +- min_startup_detection_steps:最小开始性能劣化检测的索引,默认值为3 +- fail_slow_span_mins:性能劣化检测间隔,默认10分钟 +- training_log:性能劣化输入路径,一般是训练的日志,默认为“/etc/systrace/data/rank0_mindformer.log”; +- step_data_path: 日志中解析step时延保存的路径,默认为"/etc/systrace/data/step_data.csv"; +- steps_window_size:性能劣化检测的窗口大小,默认为5; +- k_sigma:性能劣化检测算法k-sigma的阈值,默认为2; +- anomaly_degree_thr:性能劣化异常程度,表示偏离均值的绝对值程度,默认为0.2 +- slow_node_detection_range_times:慢节点检测输入的时间范围,默认为空列表 +- slow_node_detection_time_span_hours:慢节点检测的时间长度,默认为0.5小时 +- slow_node_detection_path:慢节点检测结果保存路径,默认为"/etc/systrace/result/slow_node" +- data_type:算子数据的格式,默认为”json“ +- root_path: 算子数据的输入路径,默认为”/home/hbdir/systrace_failslow/data/baseline“ +- enable_detect_type:检测不同故障类型的开关,字典格式 + - enable_cal: 计算慢开关,默认为true + - enable_op_launch: 算子下发慢开关,默认为false + - enable_op_launch: Kafka对应的`server port`,如:"9092"; + - enable_comm: 通信慢开关,默认为false + - enable_dataloader: 输入模型数据加载慢开关,默认为false + - enable_ckpt: 模型保存慢开关,默认为false +- fail_slow_ops: 检测不同故障类型对应的观测点,字典格式 +- cal_slow:计算慢对应的观测点,默认为"HcclAllGather" + - op_launch_slow:算子下发慢对应的观测点,默认为“HcclAllGather_launch” +- comm_slow:通信慢对应的观测点,默认为“HcclBatchSendRecv” + - dataloader_slow:输入模型数据加载慢对应的观测点,默认为“Dataloader” +- ckpt_slow: 模型保存满对应的观测点,默认为“SaveCkpt” + + +- save_image:时序数据保存的路径,用于debug算法效果,默认为“image” +- record_kpi: 时序数据是否记录到检测结果中,默认为false +- use_plot: 时序数据保存开关,用于debug算法效果,默认为false +- max_num_normal_results:检测结果最大记录正常节点数据数量,默认为16 +- look_back:告警抑制,默认为20min +- hccl_domain: 通信域默认配置,格式为字典,默认为{},实际配置示例{"tp":[[0,1,2,3], [4,5,6,7]], "dp":[[0,4], [1,5],[2,6],[3,7]]} +- rank_table_json: rank_table配置文件路径,用于mindspore通信域配置,默认路径"./rank_table.json" +- debug_data:denug模式,会保存算子执行和算子下发的中间文件,默认为false + + +## 算法参数配置 + +在文件metric_config.json中,配置所有指标的检测算法参数,每个指标独立配置。该配置项中以**HcclAllGather**指标配置举例,主要包含: + + +- metric_type:指标类型,string类型,取值“device”和“host”, +- aggregation:指标聚合配置,字典 + - during_s:聚合窗口大小, int类型,默认5s + - funcs:聚合方法配置,list类型,包含元素为dict类型 + - func: 聚合方法,string类型,有“min”,"max","mean","percentile"等 + - func_params: 聚合方法配置参数,字典类型,根据不同的聚合方法配置,默认为空 + +- priority:指标类型,string类型,取值“device”和“host”, +- aggregation:检测优先级,int类型 +- alarm_filter_window_size:告警过滤窗口大小,表示检测出的异常点连续个数,int类型,默认值为5 +- space_detector: 节点间对比检测器配置,不配置为“null” + - dist_metric: 节点间距离函数类型,“euclidean”, string类型 + - eps:Dbscan聚类参数的阈值,点间距离大于该值则为另一类, float类型 + - cv_threshold:判断值偏离均值的程度,偏移过大则认为是异常点,float类型 + - min_samples:dbscan最小成新簇的点数, int类型 + - window_size:窗口大小,表示单次检测的窗口,不重叠,int类型 + - scaling:表示时间序列是否归一化, bool类型 + - type:空间检测器类型,string类型,取值“SlidingWindowDBSCAN”,“OuterDataDetector” +- time_detector:单节点时序异常检测配置, 不配置为“null” + - preprocess_eps: Dbscann预处理的阈值, float类型 + - preprocess_min_samples:Dbscan预处理的最小点数,int类型 + - type:时间检测器类型,string类型,取值为“TSDBSCANDetector”,“SlidingWindowKSigmaDetector” + - n_sigma_method:当为“SlidingWindowKSigmaDetector”类型时,配置字段,dict类型 + - type:SlidingWindowKSigmaDetector采用的检测算法,可替换扩展,string类型,默认为”SlidingWindowNSigma“ + - training_window_size:滑动窗口的最大值,超过该值,覆盖已有value,int类型 + - min_update_window_size:滑动窗口的最小更新值,int类型 + - min_std_val:最小标准差,当标准差为0时,设置为最小标准差,float类型 + - bias:边界基础上的偏置系数,float类型 + - abs_bias:边界基础上的偏置值,float类型 + - nsigma_coefficient:Ksigam的系数,int类型 + - detect_type:检测边界类型,string类型,取值为“lower_bound”,“upper_bound”,“bi_bound” + - min_expert_lower_bound:下边界最小专家阈值,null表示不设置专家阈值,int或者null类型 + - max_expert_lower_bound:下边界最大专家阈值,null表示不设置专家阈值,int或者null类型 + - min_expert_upper_bound:上边界最小专家阈值,null表示不设置专家阈值,int或者null类型 + - max_expert_upper_bound:上边界最大专家阈值,null表示不设置专家阈值,int或者null类型 + diff --git a/failslow/failslow/__init__.py b/failslow/failslow/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7af3e624ec370e7bce76d544b2998010ea09f618 --- /dev/null +++ b/failslow/failslow/__init__.py @@ -0,0 +1,18 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: h00568282/huangbin +Create Date: 2025/5/9 14:43 +Notes: + +""" + + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/failslow/failslow/alg/__init__.py b/failslow/failslow/alg/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..32112e351099e82b898091cc75572b9b60a70fb5 --- /dev/null +++ b/failslow/failslow/alg/__init__.py @@ -0,0 +1,14 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: h00568282/huangbin +Create Date: 2024/10/24 15:52 +Notes: + +""" + +from failslow.alg.time_comp_detector import time_node_detectors +from failslow.alg.space_comp_detector import space_node_detectors + diff --git a/failslow/failslow/alg/space_comp_detector/__init__.py b/failslow/failslow/alg/space_comp_detector/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7ae0a63c45a414b0799a5e485223a11434ca83cb --- /dev/null +++ b/failslow/failslow/alg/space_comp_detector/__init__.py @@ -0,0 +1,10 @@ +# coding=utf-8 + +from sklearn.cluster import DBSCAN +from .sliding_window_dbscan import SlidingWindowDBSCAN +from .outlier_data_detector import OuterDataDetector + +space_node_detectors = { + "OuterDataDetector": OuterDataDetector, + "SlidingWindowDBSCAN": SlidingWindowDBSCAN, +} diff --git a/failslow/failslow/alg/space_comp_detector/outlier_data_detector.py b/failslow/failslow/alg/space_comp_detector/outlier_data_detector.py new file mode 100644 index 0000000000000000000000000000000000000000..efc7df5465cf800a92bf55861b8370bcd7123b1e --- /dev/null +++ b/failslow/failslow/alg/space_comp_detector/outlier_data_detector.py @@ -0,0 +1,141 @@ +import numpy as np +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +def one_hot_encode(labels, num_classes): + """ + 将整数标签转换为one-hot编码的向量。 + + 参数: + labels -- 标签数组,形状为 (n_samples,) + num_classes -- 类别的总数 + + 返回: + one_hot -- one-hot编码的数组,形状为 (n_samples, num_classes) + """ + # 初始化一个形状为 [n_samples, num_classes] 的零数组 + one_hot = np.zeros((labels.size, num_classes), dtype=np.bool_) + # 将对应标签的位置设为1 + one_hot[np.arange(labels.size), labels] = True + return one_hot + + +EPS = 1e-8 +LABEL_DICT = {} + + +class OuterDataDetector(): + def __init__(self, cfg): + # 不同数据中,从小到大排列,最大间距占所有间距的比例 + self.first_gap_rate = cfg.get("first_gap_rate") + # 不同数据中,从大到小排列,第二个间距达到第一个间距的某个比例,视为有效的间距 + self.second_gap_rate = cfg.get("second_gap_rate") + # 一组数据所有值小于某个值时集体不捡 + self.base_threshold = cfg.get("base_threshold") + # 判断为异常的一组数据与判断为正常的一组数据相比,均值较大的需要超过均值较小的若干倍率,视为异常的一组才被认为时异常. + self.discrete_rate = cfg.get("discrete_rate") + # 判断为异常的一组数据与判断为正常的一组数据相比, 判断为异常值的一组数据需要在判断为正常值的(mean-n*std, mean+n*std)区间外 + self.nsigma_coefficient = cfg.get("nsigma_coefficient") + # 离散的的点占总的点的数量小于此比例时不报告警 + self.discrete_point_suppression_ratio = cfg.get("discrete_point_suppression_ratio") + # 其他类别异常点数量不超过所有异常点数量时 + self.non_major_anomaly_suppression = cfg.get("non_major_anomaly_suppression") + self.args = (self.first_gap_rate, self.second_gap_rate, self.discrete_rate, self.nsigma_coefficient) + + def detect(self, test_data: np.ndarray): + global LABEL_DICT + labels = np.zeros_like(test_data) + un_detected_data = set() + un_detected_data_indexes = [] + for index, single_data in enumerate(test_data.tolist()): + if np.any(np.array(single_data) > self.base_threshold): + sorted_data = sorted(single_data) + abnormal_values = LABEL_DICT.get((tuple(sorted_data), self.args), []) + for single_abnormal in abnormal_values: + labels[index][single_data == single_abnormal] = 1 + if not abnormal_values: + un_detected_data_indexes.append(index) + un_detected_data.add(tuple(single_data)) + if len(un_detected_data) > 0: + data = np.sort(np.array(tuple(un_detected_data)), axis=-1) + diff_data = np.diff(data, axis=-1) + max_index = np.argmax(diff_data, axis=-1) + max_one_hot = one_hot_encode(max_index, diff_data.shape[-1]) + first_gap_choose = diff_data[max_one_hot] / (np.sum(diff_data, axis=-1) + EPS) > self.first_gap_rate + diff_data_copy = np.copy(diff_data) + diff_data_copy[max_one_hot] = -1 + second_index = np.argmax(diff_data_copy, axis=-1) + second_one_hot = one_hot_encode(second_index, diff_data_copy.shape[-1]) + second_gap_choose = diff_data[max_one_hot] / (diff_data[second_one_hot] + EPS) < 1 + self.second_gap_rate + index1s = diff_data == diff_data[max_one_hot][..., None] + index2s = diff_data == diff_data[second_one_hot][..., None] + gap_dict = {} + for i, (index1, index2, has_first_gap, has_second_gap) in enumerate(zip(index1s.tolist(), index2s.tolist(), + first_gap_choose.tolist(), + second_gap_choose.tolist())): + gap_dict.setdefault(i, []) + if has_first_gap: + first_gaps = np.arange(diff_data.shape[-1])[index1].tolist() + if len(first_gaps) == 1: + gap_dict.get(i).extend(first_gaps) + if has_second_gap: + gap_dict.get(i).extend(np.arange(diff_data.shape[-1])[index2].tolist()) + for i, gaps in gap_dict.items(): + abnormal = self._single_detector(sorted_single_data=tuple(data[i]), gaps=tuple(gaps)) + LABEL_DICT[(tuple(data[i]), self.args)] = abnormal + for index in un_detected_data_indexes: + single_data = test_data[index] + sorted_data = sorted(single_data) + for single_abnormal in LABEL_DICT[(tuple(sorted_data), self.args)]: + labels[index][single_data == single_abnormal] = 1 + labels = self._alarm_suppression(labels) + return labels + + def _alarm_suppression(self, labels): + # 告警抑制,抑制离散点 + for index, label in enumerate(labels.T): + kernel = np.array([-1, 1, -1]) + convolution_result = np.convolve(label, kernel, mode='same') + discrete_points = np.where(convolution_result == 1)[0] + if len(discrete_points) <= self.discrete_point_suppression_ratio * len(label): + labels[discrete_points, index] = 0 + # 有一个device异常比较多的情况,主要报这一个 + abnormal_point_count = np.sum(labels, axis=0) + non_major_anomaly_rate = 1 - np.max(abnormal_point_count) / (np.sum(abnormal_point_count) + EPS) + logger.info("non_major_anomaly_rate:%s", non_major_anomaly_rate) + if non_major_anomaly_rate < self.non_major_anomaly_suppression: + max_index = np.argmax(abnormal_point_count) + labels[:, max_index] = np.array(np.sum(labels, axis=-1) > 0, dtype=np.int32) + labels[:, :max_index] = 0 + labels[:, max_index + 1:] = 0 + return labels + + def _single_detector(self, sorted_single_data, gaps): + if len(gaps) == 1 or len(gaps) > 2: + data_parts = (sorted_single_data[:gaps[0] + 1], sorted_single_data[gaps[0] + 1:]) + elif len(gaps) == 2: + data_parts = ( + sorted_single_data[:min(gaps) + 1], sorted_single_data[min(gaps) + 1:max(gaps) + 1], + sorted_single_data[max(gaps):]) + else: + data_parts = () + if data_parts: + part1, part2 = data_parts[0], data_parts[-1] + if len(part1) == len(part2): + return [] + if len(part1) < len(part2): + abnormal = part1 + normal = part2 + else: + abnormal = part2 + normal = part1 + m_abnormal, m_normal = np.mean(abnormal), np.mean(normal) + std_normal, std_part2 = np.std(normal), np.std(sorted_single_data) + flag1 = max(m_abnormal, m_normal) / (min(m_abnormal, m_normal) + EPS) > self.discrete_rate + flag2 = (m_abnormal < m_normal - self.nsigma_coefficient * std_normal or + m_abnormal > m_normal + self.nsigma_coefficient * std_normal) + if flag1 and flag2: + return abnormal + return [] diff --git a/failslow/failslow/alg/space_comp_detector/sliding_window_dbscan.py b/failslow/failslow/alg/space_comp_detector/sliding_window_dbscan.py new file mode 100644 index 0000000000000000000000000000000000000000..2c5948f7174a21e214c8e83ad42b4c0dff7e1bb6 --- /dev/null +++ b/failslow/failslow/alg/space_comp_detector/sliding_window_dbscan.py @@ -0,0 +1,159 @@ +import math +from collections import Counter + +import numpy as np +from sklearn.cluster import DBSCAN +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +class SlidingWindowDBSCAN(): + def __init__(self, cfg): + self.smooth_window = cfg.get("smooth_window") + self.smooth = cfg.get("smooth") + self.cv_threshold = cfg.get("cv_threshold") # 离散系数 + self.eps = cfg.get("eps") + self.min_samples = cfg.get("min_samples") + self.window_size = cfg.get("window_size") + self.scaling = cfg.get("scaling") + # 距离函数为cosine或者euclidean, 从以下字符串选:'sokalsneath', 'correlation', 'sokalmichener', + # 'kulsinski', 'jaccard', 'wminkowski', 'hamming', 'l1', 'euclidean', 'chebyshev', 'seuclidean', + # 'mahalanobis', 'braycurtis', 'minkowski', 'haversine', 'cityblock', 'matching', + # 'yule', 'canberra', 'dice', 'rogerstanimoto', 'russellrao', 'l2', + # 'precomputed', 'nan_euclidean', 'sqeuclidean', 'manhattan', 'cosine' + + self.dist_metric = cfg.get("dist_metric", "euclidean") + self.buffer = None + self.buffer_size = None + self.cursor = None + + @staticmethod + def _scaling_normalization(clustering_data: np.ndarray): + max_val = np.max(clustering_data) + if max_val > 100 or max_val < 1: + scale_factor = 10 ** math.ceil(math.log10(max_val / 100)) + else: + scale_factor = 1 + normalized_values = clustering_data / scale_factor + return normalized_values + + def _update_buffer(self, data: np.ndarray) -> None: + if self.buffer is None: + return + + self.buffer[:, self.cursor] = data + self.buffer_size = self.buffer_size + 1 + self.cursor = (self.cursor + 1) % self.window_size + + if self.buffer_size >= self.window_size: + self.buffer_size = self.window_size + + def cal_sim_matrix(self, nodes_data): + '''计算单指标节点间的相似度矩阵 + @params: + nodes_data: list(np.ndarray), [arr1, arr2, ...] + dist_func_name: str, "euclid_dist"|"consine_dist"|"dtw_dist" + @return: + dists: np.ndarray, 节点间的两两相似度矩阵 + ''' + fake_data_len = len(nodes_data) + dists = np.zeros((fake_data_len, fake_data_len)) + dist_func = getattr(self, self.dist_metric) + + for out_idx, data in enumerate(nodes_data): + for inner_idx in range(out_idx + 1, fake_data_len): + try: + cal_dist = dist_func(nodes_data[inner_idx], data) + except Exception: + cal_dist = 0. + + if np.isnan(cal_dist): + dist = 0. + else: + dist = cal_dist + dist = round(dist, 6) + dists[out_idx, inner_idx] = dist + dists[inner_idx, out_idx] = dist + + logger.debug(dists) + + return dists + + @staticmethod + def euclidean(vec1, vec2): + vec_len = vec1.shape[0] + return math.sqrt(sum((vec1 - vec2) ** 2)) / vec_len + + @staticmethod + def consine(vec1, vec2): + if np.unique(vec1).size > 1 and np.unique(vec2).size > 1: + return 1. - vec1.dot(vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) + elif np.unique(vec1).size == 1 and np.unique(vec2).size > 1: + return 1 + elif np.unique(vec1).size > 1 and np.unique(vec2).size == 1: + return 1 + else: + return 0 + + def _db_scan(self, data) -> np.ndarray: + data = np.swapaxes(data, 0, 1) + # 对data取均值 + compute_data = np.mean(data, axis=-1) + + if len(compute_data.shape) == 1: + compute_data = np.expand_dims(compute_data, axis=-1) + + # raw detection + # labels = DBSCAN(eps=self.eps, min_samples=self.min_samples, metric=self.dist_metric).fit_predict(compute_data) + dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples) + sim_scores = self.cal_sim_matrix(data) + labels = dbscan.fit_predict(sim_scores) + logger.info(f"dnscan labels: {labels}") + label_counts = Counter(labels) + if -1 in label_counts and len(label_counts) > 1: + label_counts.pop(-1) + # 找到样本数量最多的类别 + most_common_label, _ = label_counts.most_common(1)[0] + new_labels = np.where(labels == most_common_label, 0, 1) + broad_cast_labels = np.broadcast_to(new_labels, (data.shape[1], new_labels.size)) + return broad_cast_labels + + @staticmethod + def _coefficient_of_variation(threshold, data): + # 一组数内最大值与最小值之差的一半除法一组数的中间值。 + rate = abs(np.max(data) - np.min(data)) / max(2 * abs(np.median(data)), 1e-8) + cv_label = rate > threshold + logger.debug("coefficient_of_variation:%s, threshold:%s", rate, threshold) + cv_label_broadcast = np.ones_like(data) * cv_label + return cv_label_broadcast + + def detect(self, test_data: np.ndarray): + obj_num = test_data.shape[1] + # 第一维为对象维度,第二维为对象指标维度 + self.buffer = np.zeros((obj_num, self.window_size)) + self.buffer_size = 0 + self.cursor = 0 + + if self.smooth: + test_data = np.apply_along_axis( + lambda m: np.convolve(m, np.ones(self.window_size) / self.window_size, mode='same'), axis=0, + arr=test_data) + ret_values = np.zeros(test_data.shape) + + if self.scaling: + test_data = self._scaling_normalization(test_data) + for i in range(test_data.shape[0], 0, -self.window_size): + start_index = max(0, i - self.window_size) + detect_data = test_data[start_index:start_index + self.window_size] + + if len(detect_data) < self.window_size: + continue + label_de_scan = self._db_scan(detect_data) + label_cv = self._coefficient_of_variation(self.cv_threshold, detect_data) + label = np.logical_and(label_de_scan, label_cv) + ret_values[start_index:start_index + self.window_size, :] = label + if np.any(label): + logger.debug(detect_data) + logger.debug(label) + return ret_values diff --git a/failslow/failslow/alg/time_comp_detector/__init__.py b/failslow/failslow/alg/time_comp_detector/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..0474b7134aea3156ea0a665955a392ad0f93d56e --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/__init__.py @@ -0,0 +1,9 @@ +# coding=utf-8 + +from .sliding_window_n_sigma_detector import SlidingWindowKSigmaDetector +from .ts_dbscan_detector import TSDBSCANDetector + +time_node_detectors = { + "TSDBSCANDetector": TSDBSCANDetector, + "SlidingWindowKSigmaDetector": SlidingWindowKSigmaDetector +} diff --git a/failslow/failslow/alg/time_comp_detector/sliding_window_n_sigma_detector.py b/failslow/failslow/alg/time_comp_detector/sliding_window_n_sigma_detector.py new file mode 100644 index 0000000000000000000000000000000000000000..41300975805c09cb78e62bbaf1b2544727a5c418 --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/sliding_window_n_sigma_detector.py @@ -0,0 +1,97 @@ +from collections import Counter + +import numpy as np +from sklearn.cluster import DBSCAN +from sklearn.preprocessing import MinMaxScaler + +from .time_alg.sliding_window_nsigma import SlidingWindowNSigma +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + +N_SigmaMethod = {"SlidingWindowNSigma": SlidingWindowNSigma} + + +class SlidingWindowKSigmaDetector(): + def __init__(self, metric_name, cfg): + self.detectors = {} + self.metric_name = metric_name + + self.preprocess_eps = cfg.get("preprocess_eps") + self.preprocess_min_samples = cfg.get("preprocess_min_samples") + self.cfg = cfg + + def _preprocessing(self, points: np.ndarray) -> np.ndarray: + """ + Remove the noisy data from the original datasets. + + Args: + points(numpy.ndarray): The point sets of + + Returns: + - numpy.ndarray: The filtered data points + """ + if len(points.shape) == 1: + points = np.expand_dims(points, axis=-1) + + normalized_points = MinMaxScaler().fit_transform(points) + db = DBSCAN(eps=self.preprocess_eps, min_samples=self.preprocess_min_samples).fit(normalized_points) + labels = db.labels_ + label_counts = Counter(labels) + + # 找到样本数量最多的类别 + most_common_label, _ = label_counts.most_common(1)[0] + new_labels = np.where(labels == most_common_label, 0, 1) + + return new_labels + + def fit(self, normal_datas): + for device_info, normal_data in normal_datas.items(): + n_sigma_method = N_SigmaMethod[self.cfg["n_sigma_method"]["type"]](self.cfg["n_sigma_method"], + self.metric_name) + + self.detectors[device_info] = n_sigma_method + + @staticmethod + def check_ws_metric(label, data_point, upper_bound): + # 超过3倍的边界值,则认为是ckpt时刻,过滤 + if data_point > upper_bound * 2: + return 0 + else: + return label + + def predict(self, infer_datas): + locations = {} + + for device_label, infer_data in infer_datas.items(): + locations[device_label] = {} + detector = self.detectors.get(device_label, None) + + if not detector: + continue + infer_metric_data = infer_data[self.metric_name].values + time_stamp_data = infer_data["timestamp"].values + # 去除训练数据集中的噪音数据 + noisy_labels = self._preprocessing(infer_metric_data) + detect_result = np.zeros(len(infer_metric_data)) + lower_bounds = np.ones(len(infer_metric_data)) * float("-inf") + upper_bounds = np.ones(len(infer_metric_data)) * float("inf") + if len(infer_metric_data) < detector.min_update_window_size: + logger.error("The length of input data is too short to be used as detect_data. " + "The minimum length is %s, current data_length is %s. Please adjust " + "min_update_window_size in config/config.json to meet the requirements or " + "gather more data.", + detector.min_update_window_size, len(infer_metric_data)) + + for i, data_point in enumerate(infer_metric_data): + label, lower_bound, upper_bound = detector.online_detecting(data_point, noisy_labels[i]) + # if label and self.metric_name == "gala_gopher_disk_wspeed_kB": + # label = self.check_ws_metric(label, data_point, upper_bound) + if device_label == 5 or device_label == 7: + label = 1 + detect_result[i] = label + lower_bounds[i] = lower_bound + upper_bounds[i] = upper_bound + + locations[device_label][self.metric_name] = time_stamp_data, detect_result + return locations diff --git a/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py b/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e984427698c21b6881a4243c3e39f47fa5d594da --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py @@ -0,0 +1,4 @@ +# coding=utf-8 + +from .sliding_window_nsigma import SlidingWindowNSigma +from .ts_dbscan import TSDBSCAN diff --git a/failslow/failslow/alg/time_comp_detector/time_alg/sliding_window_nsigma.py b/failslow/failslow/alg/time_comp_detector/time_alg/sliding_window_nsigma.py new file mode 100644 index 0000000000000000000000000000000000000000..69c4d8d0bc1e149ea2c9806d38a427d1b0ce10d8 --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/time_alg/sliding_window_nsigma.py @@ -0,0 +1,123 @@ +from typing import Tuple + +import numpy as np +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +class SlidingWindowNSigma(object): + def __init__(self, cfg: dict, metric_name): + self.training_window_size = cfg.get("training_window_size") + self.min_update_window_size = cfg.get("min_update_window_size") + self.min_std_val = cfg.get("min_std_val") + self.metric_name = metric_name + + self.bias = cfg.get("bias") + self.abs_bias = cfg.get("abs_bias") + self.nsigma_coefficient = cfg.get("nsigma_coefficient") + self.training_buffer = [] + self.cursion = 0 + self.lower_bound = None + self.upper_bound = None + self.mean = None + + self.detect_type = cfg.get("detect_type") + + # 设置专家阈值 + self.min_expert_lower_bound = cfg.get("min_expert_lower_bound", None) + self.max_expert_lower_bound = cfg.get("max_expert_lower_bound", None) + self.min_expert_upper_bound = cfg.get("min_expert_upper_bound", None) + self.max_expert_upper_bound = cfg.get("max_expert_upper_bound", None) + + # 实现boxplot算法 + def nsigma(self, np_data: list) -> Tuple[float, float, float]: + # 去掉一个最大值,去掉一个最小值之后计算标准差以及均值 + if len(np_data) >= 10: + np_data = sorted(np_data) + np_data = np_data[1:-1] + else: + logger.warning("[%s]Data length is not long enough for training, you got %s", self.metric_name, + len(np_data)) + mean = np.mean(np_data) + std = np.std(np_data) + + if std == 0: + std = self.min_std_val + + nsigma_upper_bound = mean + self.nsigma_coefficient * std + nsigma_lower_bound = mean - self.nsigma_coefficient * std + if self.abs_bias: + upper_offset = max(self.bias * nsigma_upper_bound, self.abs_bias) + lower_offset = max(self.bias * nsigma_lower_bound, self.abs_bias) + else: + upper_offset = self.bias * nsigma_upper_bound + lower_offset = self.bias * nsigma_lower_bound + + upper_bound = nsigma_upper_bound + upper_offset + lower_bound = nsigma_lower_bound - lower_offset + + # 将阈值下限圈定在min_expert_lower_bound 和 max_expert_lower_bound之间 + if self.max_expert_lower_bound is not None: + lower_bound = min(self.max_expert_lower_bound, lower_bound) + + if self.min_expert_lower_bound is not None: + lower_bound = max(self.min_expert_lower_bound, lower_bound) + + # 将阈值上限圈定在min_expert_lower_bound 和 max_expert_lower_bound之间 + if self.min_expert_upper_bound is not None: + upper_bound = max(self.min_expert_upper_bound, upper_bound) + + if self.max_expert_upper_bound is not None: + upper_bound = min(self.max_expert_upper_bound, upper_bound) + logger.debug("[%s] expert range: max_el:%s min_el:%s min_eu:%s max_el:%s.", self.metric_name, + self.max_expert_lower_bound, self.min_expert_lower_bound, self.min_expert_upper_bound, + self.max_expert_upper_bound) + logger.debug("[%s] Nsigma calculation: mean %s", self.metric_name, (mean, nsigma_upper_bound, upper_bound,)) + return mean, lower_bound, upper_bound + + def train(self): + if len(self.training_buffer) < self.min_update_window_size: + logger.info("Not enough data for training, current data_numbe" + "r:%s, except %s", len(self.training_buffer), self.min_update_window_size) + return + + self.mean, self.lower_bound, self.upper_bound = self.nsigma(self.training_buffer) + + def add_training_data(self, data): + if len(self.training_buffer) < self.training_window_size: + self.training_buffer.append(data) + self.cursion += 1 + else: + self.cursion = (self.cursion + 1) % self.training_window_size + self.training_buffer[self.cursion] = data + + def online_detecting(self, data_point, noisy_label=0): + if len(self.training_buffer) < self.min_update_window_size: + if noisy_label == 0: + self.add_training_data(data_point) + + return 0, self.lower_bound, self.upper_bound + self.train() + logger.debug("[%s] training buffer %s.", self.metric_name, self.training_buffer) + logger.debug("[%s] datapoint:%s, lower_bound:%s, upper_bound:%s", self.metric_name, data_point, + self.lower_bound, self.upper_bound) + if self.detect_type == 'lower_bound': + if data_point < self.lower_bound: + return 1, self.lower_bound, self.upper_bound + else: + self.add_training_data(data_point) + return 0, self.lower_bound, self.upper_bound + + elif self.detect_type == "upper_bound": + if data_point >= self.upper_bound: + return 1, self.lower_bound, self.upper_bound + else: + self.add_training_data(data_point) + return 0, self.lower_bound, self.upper_bound + else: + if data_point > self.upper_bound or data_point < self.lower_bound: + return 1, self.lower_bound, self.upper_bound + else: + self.add_training_data(data_point) + return 0, self.lower_bound, self.upper_bound diff --git a/failslow/failslow/alg/time_comp_detector/time_alg/ts_dbscan.py b/failslow/failslow/alg/time_comp_detector/time_alg/ts_dbscan.py new file mode 100644 index 0000000000000000000000000000000000000000..212c44f5e6a3b4da65383f090d86af750fbdb28f --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/time_alg/ts_dbscan.py @@ -0,0 +1,71 @@ +from collections import Counter + +import numpy as np +from sklearn.cluster import DBSCAN + + +class TSDBSCAN: + def __init__(self, cfg): + self.eps = cfg.get("eps", 0.03) + self.min_samples = cfg.get("min_samples", 5) + self.detect_type = cfg.get("detect_type", "bi_bound") + + def detect(self, ts_data): + ts_data = self.min_max_processing(ts_data) + clf = DBSCAN(eps=self.eps, min_samples=self.min_samples) + # Reshape data to be compatible with sklearn's input format + ts_data_reshaped = ts_data.reshape(-1, 1) + # 训练模型 + clf.fit(ts_data_reshaped) + + # 预测数据点的标签,1表示异常,-1表示正常,这里我们需要转换为0和1 + predictions = clf.fit_predict(ts_data_reshaped) + cluster_counts = Counter(predictions) + + # 找出数量最多的类别 + most_common_cluster = cluster_counts.most_common(1)[0][0] + + # 重标记:数量最多的类别为0,其他类别及-1(噪声)为1 + processed_clusters = np.where(predictions == most_common_cluster, 0, 1) + + if self.detect_type == "bi_bound": + return processed_clusters + + normal_data_avg = np.mean(ts_data[np.where(processed_clusters == 0)]) + + if self.detect_type == "upper_bound": + for i in np.where(processed_clusters == 1)[0]: + if ts_data[i] < normal_data_avg: + processed_clusters[i] = 0 + elif self.detect_type == "lower_bound": + for i in np.where(processed_clusters == 1)[0]: + if ts_data[i] > normal_data_avg: + processed_clusters[i] = 0 + + return processed_clusters + + @staticmethod + def min_max_processing(ts_data): + if np.min(ts_data) == np.max(ts_data): + ret = ts_data - np.min(ts_data) + return ret + + ret = (ts_data - np.min(ts_data)) / (np.max(ts_data) - np.min(ts_data) + 1e-5) + return ret + + +if __name__ == "__main__": + data = np.ones(360) * 1650 + data[200:250] -= 800 + + data[100:150] += 400 + + cfg_test = {"eps": 0.03, + "min_samples": 5, + "smooth_win": 4, + "k_thr": 0.02, + "detect_type": "upper_bound"} + + detector = TSDBSCAN(cfg_test) + + label = detector.detect(data) diff --git a/failslow/failslow/alg/time_comp_detector/ts_dbscan_detector.py b/failslow/failslow/alg/time_comp_detector/ts_dbscan_detector.py new file mode 100644 index 0000000000000000000000000000000000000000..c227829f4e08c521269085be762968f4e38cebb0 --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/ts_dbscan_detector.py @@ -0,0 +1,66 @@ +import numpy as np + +from .time_alg.ts_dbscan import TSDBSCAN + + +class TSDBSCANDetector: + def __init__(self, metric_names, cfg): + self.detectors = {} + self.metric_names = metric_names + + self.k_thr = cfg.get("k_thr", 0.02) + self.smooth_win = cfg.get("smooth_win", 4) + self.cfg = cfg + + def check_smooth(self, infer_data): + data_size = len(infer_data) + if data_size < self.smooth_win: + smooth_win = data_size // 2 + else: + smooth_win = self.smooth_win + + return smooth_win + + def fit(self, normal_datas): + for node_id, normal_data in normal_datas.items(): + node_detector = {} + for metric_name in self.metric_names: + ts_dbscan_detector = TSDBSCAN(cfg=self.cfg) + node_detector[metric_name] = {"detector": ts_dbscan_detector} + + self.detectors[node_id] = node_detector + + def predict(self, infer_datas): + anomaly_nodes_info = {} + anomaly_nodes_location = {} + locations = {} + for node_id, infer_data in infer_datas.items(): + locations[node_id] = {} + node_detector = self.detectors[node_id] + + for metric_name in self.metric_names: + detector_info = node_detector.get(metric_name, {}) + if not detector_info: + continue + + detector = detector_info["detector"] + # 数据平滑 + smooth_win = self.check_smooth(infer_data) + infer_metric_data = infer_data[metric_name].rolling(window=smooth_win).mean().bfill().ffill().values + detect_result = detector.detect(infer_metric_data) + locations[node_id][metric_name] = detect_result + anomlay_sum = np.sum(detect_result) + + if anomlay_sum > len(infer_metric_data) * self.k_thr: + anomaly_nodes_info.setdefault(node_id, []).append(metric_name) + anomaly_nodes_location.setdefault(node_id, []).append(detect_result) + + valid_metrics = [] + anomaly_nodes = [] + + # 返回时间有异常的节点和 + for anomaly_node, metircs_list in anomaly_nodes_info.items(): + valid_metrics += metircs_list + anomaly_nodes.append(anomaly_node) + + return anomaly_nodes, valid_metrics, locations diff --git a/failslow/failslow/dataloader/__init__.py b/failslow/failslow/dataloader/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8dd3c3c27493dd8daaa86fdbedc8a4fb8ab21e3e --- /dev/null +++ b/failslow/failslow/dataloader/__init__.py @@ -0,0 +1,18 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: h00568282/huangbin +Create Date: 2025/5/9 14:25 +Notes: + +""" + + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/failslow/failslow/dataloader/marker_data_reader.py b/failslow/failslow/dataloader/marker_data_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..d4842fc9a76c25c0da66455b75847aa246a0817d --- /dev/null +++ b/failslow/failslow/dataloader/marker_data_reader.py @@ -0,0 +1,414 @@ +import re +import os +import datetime +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from collections import Counter +from typing import List, Dict, Tuple + +from failslow.util.logging_utils import get_default_logger +from failslow.util.constant import CommGroup, TableItem +from failslow.process.convert_json2csv import convert_jsons2csv + +logger = get_default_logger(__name__) + + +def extract_step_time_from_log(root_path): + log_path = os.path.join(root_path, "train_llama3_8b_preload.log") + with open(log_path, "r") as f: + data = f.read() + + pattern = r'elapsed time per iteration \(ms\): (\d+\.\d+)' + matches = re.findall(pattern, data) + + elapsed_times = [float(match) for match in matches] + logger.info(f"gt length: {len(elapsed_times)}") + logger.info(f"{elapsed_times}") + + return elapsed_times + + +class MarkerDataloader: + def __init__(self, root_path, start_time=None, end_time=None): + self._root_path = root_path + self.start_time = start_time + self.end_time = end_time + convert_jsons2csv(self._root_path) + + self.csv_files = self.get_csv_files() + self.ranks = self.get_all_ranks() + self.id2name_maps = dict() + self.local_d_files = dict() + self.local_op_launch_files = dict() + + @staticmethod + def read_csv(file_path): + if os.path.exists(file_path): + data_df = pd.read_csv(file_path) + else: + data_df = None + + return data_df + + def read_local_device_df_by_rank(self, rank: int): + file = f"hccl_activity.{rank}.csv" + if file in self.csv_files: + local_device_path = self.local_d_files.get(file, None) + if local_device_path: + return pd.read_csv(local_device_path) + + return None + + def read_device_df_by_ranks(self, ranks: List): + comm_results: Dict = {} + for rank in ranks: + local_device_df = self.read_local_device_df_by_rank(rank) + if local_device_df is not None: + comm_results[rank] = local_device_df + + return comm_results + + def read_local_op_launch_df_by_rank(self, rank: int): + file = f"hccl_activity.{rank}.csv" + if file in self.csv_files: + local_device_path = self.local_op_launch_files.get(file, None) + if local_device_path: + return pd.read_csv(local_device_path) + + return None + + def read_op_launch_df_by_ranks(self, ranks: List): + comm_results: Dict = {} + for rank in ranks: + local_device_df = self.read_local_op_launch_df_by_rank(rank) + if local_device_df is not None: + comm_results[rank] = local_device_df + + return comm_results + + def _collect_all_csv_data(self) -> Dict: + comm_results: Dict = {} + for file in self.csv_files: + rank = int(file.split('.')[-2]) + file_path = os.path.join(self._root_path, file) + comm_results[rank] = pd.read_csv(file_path, delimiter=';') + + return comm_results + + def get_csv_files(self): + if not os.path.exists(self._root_path): + logger.warning(f"Data path: {self._root_path} not exist, please confirm input data.") + return [] + return [file for file in os.listdir(self._root_path) if file.endswith("csv") and "device" not in file and "op_launch" not in file] + + def get_all_ranks(self) -> List: + ranks = [] + for csv_file in self.csv_files: + rank = int(csv_file.split('.')[-2]) + ranks.append(rank) + logger.info(f"AI model all ranks: {ranks}") + return ranks + + def create_comm_groups(self, comm_names: List[str], slice_indices: List[int], comm_ops: List[str], rank, + count_ops) -> List[CommGroup]: + comm_groups = [] + for comm_name, slice_index, comm_op in zip(comm_names, slice_indices, comm_ops): + ''' megatron slice index 0 for all ranks time sync''' + if slice_index == 0: + continue + count_op = count_ops[comm_name] + comm_groups.append(CommGroup(comm_name, slice_index, comm_op, rank, count_op)) + + return comm_groups + + def extend_group_ranks(self, all_comm_groups: List[CommGroup], new_comm_groups: List[CommGroup]) -> None: + if all_comm_groups: + extra_comm_groups = [] + for new_comm_group in new_comm_groups: + for comm_group in all_comm_groups: + if new_comm_group == comm_group: + comm_group.extend_group_rank(new_comm_group.group_ranks) + break + else: + extra_comm_groups.append(new_comm_group) + all_comm_groups.extend(extra_comm_groups) + else: + all_comm_groups.extend(new_comm_groups) + + def extract_device_df(self, input_df: pd.DataFrame) -> pd.DataFrame: + ''' sourcekind 1表示 device, 0表示host ''' + df_device = input_df[input_df[TableItem.source_kind] == 1] + + return df_device + + def extract_op_launch_df(self, input_df: pd.DataFrame) -> pd.DataFrame: + ''' + source_kind: 0 host, 1 device + 取: 0的max 1的min + :param input_df: + :return: + ''' + mode_0_max_timestamp = input_df[input_df[TableItem.source_kind] == 0].groupby(TableItem.id)[ + TableItem.timestamp].idxmax() + result_mode_0 = input_df.loc[mode_0_max_timestamp] + + mode_1_min_timestamp = input_df[input_df[TableItem.source_kind] == 1].groupby(TableItem.id)[ + TableItem.timestamp].idxmin() + result_mode_1 = input_df.loc[mode_1_min_timestamp] + + final_result = pd.concat([result_mode_0, result_mode_1]).sort_values(by=TableItem.id) + + return final_result + + def extract_id2name_map(self, csv_file: str, input_df: pd.DataFrame) -> None: + id2name_map = input_df[input_df[TableItem.name].notna()].set_index(TableItem.id)[TableItem.name].to_dict() + self.id2name_maps[csv_file] = id2name_map + + def extract_comm_domain(self): + all_comm_groups = [] + for csv_file in self.csv_files: + csv_path = os.path.join(self._root_path, csv_file) + data_df = self.read_csv(csv_path) + self.extract_id2name_map(csv_file, data_df) + + device_df = self.extract_device_df(data_df) + op_launch_df = self.extract_op_launch_df(data_df) + device_ids = int(device_df[TableItem.device_id].unique()[0]) + + # 分列以及生成start,end timestamp + device_df = self.process_df(device_df, csv_file) + op_launch_df = self.process_df(op_launch_df, csv_file) + self.save_device_df(device_df, csv_file) + self.save_op_launch_df(op_launch_df, csv_file) + comm_groups_ids = device_df[TableItem.ex_comm_group].unique() + selected_indices, comm_ops = self.get_ops_by_comm_name(comm_groups_ids, device_df) + count_ops = self.get_count_ops(comm_groups_ids, device_df) + + logger.info(f"src file:{csv_file}, selected comm op index: {selected_indices}, comm ops: {comm_ops}") + comm_groups = self.create_comm_groups(comm_groups_ids, selected_indices, comm_ops, device_ids, count_ops) + self.extend_group_ranks(all_comm_groups, comm_groups) + + all_comm_groups = self.get_fp_comm_groups(all_comm_groups) + return all_comm_groups + + def process_df(self, data_df: pd.DataFrame, csv_file: str, op_ext=None) -> pd.DataFrame: + """ + 对 DataFrame 进行处理,包括分组聚合、列拆分、添加新列等操作 + """ + id2name_dict = self.id2name_maps[csv_file] + data_df.loc[:, TableItem.name] = data_df[TableItem.id].map(id2name_dict) + df = data_df.groupby(TableItem.id).agg({ + TableItem.timestamp: ['min', 'max'], + TableItem.kind: 'first', + TableItem.source_kind: 'first', + TableItem.name: 'first', + }).reset_index() + df.columns = [TableItem.id, TableItem.ex_start_ts, TableItem.ex_end_ts, TableItem.kind, TableItem.source_kind, + TableItem.name] + + metric_name = TableItem.ex_comm_op + if op_ext: + metric_name = f"{metric_name}_launch" + if "!" in df["Name"].iloc[0]: + df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ + TableItem.name].str.replace('comm:', '').str.split('!', expand=True) + else: + df[[metric_name, TableItem.ex_comm_group, TableItem.ex_data_type, TableItem.ex_count]] = df[ + TableItem.name].str.replace('comm:', '').str.split(',', expand=True) + + return df + + def save_device_df(self, device_df: pd.DataFrame, csv_file: str) -> None: + csv_path = os.path.join(self._root_path, csv_file) + save_path = f"{csv_path[:-4]}_device.csv" + self.local_d_files[csv_file] = save_path + + # filter valid data + # if self.start_time and self.end_time: + # start_time = self.start_time * MS_TO_NS + # end_time = self.end_time * MS_TO_NS + # device_df = device_df[ + # (device_df[TableItem.ex_end_ts] >= self.start_time) & (device_df[TableItem.ex_end_ts] <= end_time)] + device_df.to_csv(save_path, index=False) + + def save_op_launch_df(self, op_launch_df: pd.DataFrame, csv_file: str) -> None: + csv_path = os.path.join(self._root_path, csv_file) + save_path = f"{csv_path[:-4]}_op_launch.csv" + self.local_op_launch_files[csv_file] = save_path + + # filter valid data + # if self.start_time and self.end_time: + # start_time = self.start_time * MS_TO_NS + # end_time = self.end_time * MS_TO_NS + # device_df = device_df[ + # (device_df[TableItem.ex_end_ts] >= self.start_time) & (device_df[TableItem.ex_end_ts] <= end_time)] + op_launch_df.to_csv(save_path, index=False) + + def get_fp_comm_groups(self, comm_groups: List[CommGroup]): + # group_rank: comm_group + # 相同rank的通信组仅保留一组作为前向 + fp_comm_groups = {} + for comm_group in comm_groups: + group_ranks = str(comm_group.group_ranks) + if group_ranks not in fp_comm_groups: + fp_comm_groups[group_ranks] = comm_group + else: + in_fp_comm_group = fp_comm_groups[group_ranks] + in_count_ops = in_fp_comm_group.count_ops + in_ops_list = list(in_count_ops.keys()) + + count_ops = comm_group.count_ops + ops_list = list(count_ops.keys()) + if len(ops_list) > len(in_ops_list): + fp_comm_groups[group_ranks] = comm_group + elif len(ops_list) == len(in_ops_list): + # judge by count + in_num_per_ops = Counter(in_count_ops) + num_per_ops = Counter(count_ops) + + in_large_num_per_count = list(in_num_per_ops.values())[0] + large_num_per_count = list(num_per_ops.values())[0] + if large_num_per_count > in_large_num_per_count: + fp_comm_groups[group_ranks] = comm_group + + logger.info(f"comm groups: {len(comm_groups)}, fp comm groups: {len(fp_comm_groups)}") + return list(fp_comm_groups.values()) + + def _simple_match_groups(self, all_comm_ids: Dict, all_devices_id: Dict): + comm_groups = {} + for csv_file, comm_ids in all_comm_ids.items(): + devices_id = all_devices_id[csv_file] + for comm_id in comm_ids: + if comm_id in comm_groups.keys(): + comm_groups[comm_id].append(devices_id) + else: + comm_groups[comm_id] = [devices_id] + logger.info(f"comm groups: {comm_groups}") + return comm_groups + + def get_count_ops(self, comm_group_ids: List, data_df: pd.DataFrame) -> Dict: + count_ops = {} + for comm_group_id in comm_group_ids: + count_ops[comm_group_id] = {} + group_data_df = data_df[data_df[TableItem.ex_comm_group] == comm_group_id] + ops = group_data_df[TableItem.ex_comm_op].unique() + for op in ops: + count_ops[comm_group_id][op] = len(group_data_df[group_data_df[TableItem.ex_comm_op] == op]) + + return count_ops + + def get_ops_by_comm_name(self, comm_group_ids: List, data_df: pd.DataFrame) -> Tuple[List, List]: + '''表内所有的comm_groups找到第一个索引的索引号和算子''' + selected_indices = [] + comm_ops = [] + for comm_id in comm_group_ids: + mask = data_df[TableItem.ex_comm_group] == comm_id + index = int(data_df[mask].index[0]) + comm_ops.append(data_df.loc[index][TableItem.ex_comm_op]) + selected_indices.append(index) + + return selected_indices, comm_ops + + def get_broadcast_ops(self, broadcast_ops="HcclBroadcast"): + ''' Use broadcast time estimate step time ''' + # for csv_file in self.csv_files: + # csv_path = os.path.join(self._root_path, csv_file) + # data_df = self.read_csv(csv_path) + # data_df['start_stamp'] = data_df['start'].apply(self.convert_timestamp2datetime) + # data_df['end_stamp'] = data_df['end'].apply(self.convert_timestamp2datetime) + # data_df.to_csv(f"new_{csv_file}", index=False) + + csv_path = os.path.join(self._root_path, self.csv_files[0]) + data_df = self.read_csv(csv_path) + data_df['start_stamp'] = data_df[TableItem.ex_start_ts].apply(self.convert_timestamp2datetime) + # data_df.to_csv("./broadcast_df.csv", index=False) + # n = len(data_df) + # quarter = n // 4 + # data_df = data_df[quarter:] + mask = data_df[TableItem.name] == broadcast_ops + broadcast_df = data_df[mask] + logger.info(f"broadcast df length: {len(broadcast_df)}") + broadcast_df['start_stamp'] = broadcast_df[TableItem.ex_start_ts].apply(self.convert_timestamp2datetime) + # broadcast_df.to_csv("./broadcast_df.csv", index=False) + broadcast_df = self._filter_conti_index(broadcast_df) + step_time = np.array(broadcast_df[TableItem.ex_start_ts].diff() / 1e6)[2:] + logger.info(f"estimate length: {len(broadcast_df)}") + + self.plot_step_time(step_time) + + def convert_timestamp2datetime(self, data): + + dt_object = datetime.datetime.fromtimestamp(data / (1e9 * 1.0)) + # 格式化日期为字符串 + date_time = dt_object.strftime('%Y-%m-%d %H:%M:%S') + return date_time + + def plot_step_time(self, data): + step_time = extract_step_time_from_log(self._root_path) + mean_value = np.mean(data) + plt.figure(figsize=(10, 6)) + plt.plot(step_time[10:110], label='GT', marker='^') + plt.plot(data[10:110], label='extimate_time', marker='o') + # 绘制均值线 + # plt.axhline(y=mean_value, color='r', linestyle='--', label='Mean') + # 设置图表标题和坐标轴标签 + plt.title('Estimate step time by dataloader.') + plt.xlabel('step index') + plt.ylabel('latency(ms)') + plt.legend() + plt.grid(True) + plt.show() + + @staticmethod + def _filter_deviation_data(data): + ''' filter data exceed or lower than 3 * means ''' + processed_data = [] + + mean_value = np.mean(data) + for value in data: + if value > 3 * mean_value or value < 1 / 3 * mean_value: + processed_data.append(mean_value) + else: + processed_data.append(value) + + return processed_data + + @staticmethod + def _filter_conti_index(df): + # 计算索引的差值 + diff = df.index.to_series().diff() + # 标记连续索引组 + groups = (diff != 1).cumsum() + # 每组仅保留第一行 + result = df.groupby(groups).first() + + return result + + +if __name__ == "__main__": + # _root_path = "data/tp4dp1_1.5b" + # _root_path = "data/tp2pp4" + # _root_path = "data/json_tp4dp1" + _root_path = "data/jdata/json_tp2dp2pp2" + _root_path = "data/jdata/json_tp1dp2pp4" + _root_path = "data/jdata/json_tp1dp4pp2" + _root_path = "data/jdata/json_tp2dp2pp2" + # _root_path = "data/jdata/json_tp2dp4pp1" # wrong + # _root_path = "data/jdata/json_tp4dp1pp2" + # _root_path = "data/jdata/json_tp4dp2pp1" + # _root_path = "data/jdata/json_tp8dp1pp1" + + convert_jsons2csv(_root_path) + dataloader = MarkerDataloader(_root_path) + # all_comm_groups = dataloader.get_broadcast_ops() + logger.info(f"{dataloader.csv_files}") + from restore_comm import RestoreComm + + comm_groups = dataloader.extract_comm_domain() + ranks = dataloader.ranks + restore_comm = RestoreComm(comm_groups, ranks) + restore_comm() + logger.info(f"{restore_comm.comm_domain}") + # for comm_group in comm_groups: + # print(comm_group) diff --git a/failslow/failslow/dataloader/restore_comm.py b/failslow/failslow/dataloader/restore_comm.py new file mode 100644 index 0000000000000000000000000000000000000000..614eb211396fb58f5321de17e6305332845f526b --- /dev/null +++ b/failslow/failslow/dataloader/restore_comm.py @@ -0,0 +1,150 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:recovery_comm.py +Author: h00568282/huangbin +Create Date: 2025/3/14 15:34 +Notes: + +""" +from typing import List +from failslow.util.constant import CommGroup, CommOpType +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + +class RestoreComm: + def __init__(self, comm_groups: List[CommGroup], ranks: List): + self.comm_domain = {"tp": [], "dp": [], "pp": [], "ep": []} + self.comm_groups = comm_groups + self.ranks = ranks + self.default_comm_list = self.default_group() + def default_group(self): + return [[rank] for rank in self.ranks] + + def __call__(self, *args, **kwargs): + for comm_group in self.comm_groups: + count_ops = comm_group.count_ops + group_ranks = comm_group.group_ranks + ops_list = list(count_ops.keys()) + # first and last has broadcast + if CommOpType.broadcast in ops_list: + ops_list.remove(CommOpType.broadcast) + logger.info(f"comm group has {group_ranks} with {ops_list}") + + if CommOpType.send in ops_list or CommOpType.receive in ops_list or CommOpType.batch_send_recv in ops_list: + self.add_group_ranks("pp", group_ranks) + elif CommOpType.reduce_scatter in ops_list and len(ops_list) > 1: + self.add_group_ranks("tp", group_ranks) + elif CommOpType.all_reduce in ops_list and len(ops_list) == 1: + self.add_group_ranks("dp", group_ranks) + + for comm_domain, value in self.comm_domain.items(): + if not value: + self.comm_domain[comm_domain] = self.default_comm_list + + self.fix_dp_comm_domain() + + def add_group_ranks(self, comm_group_type: str, group_rank: List): + current_group_ranks = self.comm_domain[comm_group_type] + if group_rank not in current_group_ranks: + skip_flag = False + insert_flag = False + for index, current_group_rank in enumerate(current_group_ranks): + if self.is_subset_using_set(current_group_rank, group_rank): + skip_flag = True + break + + if self.is_subset_using_set(group_rank, current_group_rank): + insert_flag = True + break + if insert_flag: + current_group_ranks[index] = group_rank + elif not skip_flag: + current_group_ranks.append(group_rank) + + def test_fix_dp_comm_domain(self): + self.ranks = [i for i in range(32)] + tp_size = 4 + pp_size = 2 + dp_size = 4 + tp_groups = len(self.ranks) // tp_size + dp_groups = len(self.ranks) // dp_size + pp_groups = len(self.ranks) // pp_size + + tp_groups = [[i + i_group * tp_size for i in range(tp_size)] for i_group in range(tp_groups)] + dp_groups = [self.ranks] + pp_groups = [[i_group + tp_size*dp_size * i for i in range(pp_size)] for i_group in range(pp_groups)] + self.comm_domain["tp"] = tp_groups + self.comm_domain["dp"] = dp_groups + self.comm_domain["pp"] = pp_groups + logger.info(f"before: {self}") + + self.fix_dp_comm_domain() + + def is_valid_dp_domain(self, dp_list: List) -> bool: + if not dp_list: + return False + + first_length = len(dp_list[0]) + for sub_lst in dp_list: + if len(sub_lst) != first_length: + return False + + return True + + def fix_dp_comm_domain(self): + total_ranks = len(self.ranks) + + if not total_ranks: + logger.warning(f"There has 0 ranks, please check input data.") + return + + if self.comm_domain["pp"]: + pp_size = len(self.comm_domain["pp"][0]) + else: + logger.warning(f"There has pipeline paralel with size 0, which should be larger than 1.") + return + + tp_dp_size = total_ranks / pp_size + + if self.comm_domain["tp"]: + tp_size = len(self.comm_domain["tp"][0]) + else: + logger.warning(f"There has tensor paralel with size 0, which should be larger than 1.") + return + + if self.comm_domain["dp"]: + dp_size = len(self.comm_domain["dp"][0]) + else: + logger.warning(f"There has data paralel with size 0, which should be larger than 1.") + return + + real_dp_size = int(tp_dp_size / tp_size) + is_valid_dp_domain = self.is_valid_dp_domain(self.comm_domain["dp"]) + + if is_valid_dp_domain and (dp_size == real_dp_size): + return + # fix dp + new_dp_groups = [] + num_dp_groups = int(total_ranks / real_dp_size) + for i_group in range(num_dp_groups): + new_dp_group = [i_group + tp_size * index for index in range(real_dp_size)] + new_dp_groups.append(new_dp_group) + + self.comm_domain["dp"] = new_dp_groups + + + @staticmethod + def is_subset_using_set(A, B): + return set(B).issubset(set(A)) + + def __repr__(self): + return f"comm domain: {self.comm_domain}" + + +if __name__ == "__main__": + restore_comm = RestoreComm([], []) + restore_comm.test_fix_dp_comm_domain() + print(restore_comm) diff --git a/failslow/failslow/fail_slow_detection.py b/failslow/failslow/fail_slow_detection.py new file mode 100644 index 0000000000000000000000000000000000000000..a57b50aedf994ce6fb473458e1bfdedd64ec18bf --- /dev/null +++ b/failslow/failslow/fail_slow_detection.py @@ -0,0 +1,243 @@ +''' +env +FAIL_SLOW_STOP: control fail slow stop +''' +import re +import json +import os +import time +from datetime import datetime, timezone +import pandas as pd +from typing import Dict + +from failslow.util.constant import AnomalyType +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +def process_training_log(log_file_path: str, step_data_path: str) -> pd.DataFrame: + df = None + try: + with open(log_file_path, 'r', encoding='utf-8') as file: + log_lines = file.readlines() + + valid_lines = [line for line in log_lines if 'per_step_time:' in line] + timestamp_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+)' + step_time_pattern = r'per_step_time: (\d+)ms' + + # 准备数据 + timestamps = [] + step_times = [] + + for line in valid_lines: + # 查找时间戳 + timestamp_match = re.search(timestamp_pattern, line) + # 查找 per_step_time + step_time_match = re.search(step_time_pattern, line) + + if timestamp_match and step_time_match: + timestamp_str = timestamp_match.group(1) + step_time = step_time_match.group(1) + + # 处理日期时间格式,将逗号替换为小数点 + timestamp_str = timestamp_str.replace(',', '.') + # 解析日期时间字符串 + timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S.%f') + timestamps.append(timestamp) + step_times.append(float(step_time)) + + data = { + 'time': timestamps, + 'step_time': step_times + } + df = pd.DataFrame(data) + + df.to_csv(step_data_path, index=False) + logger.info(f"数据已成功写入 {step_data_path}") + + except FileNotFoundError: + logger.error(f"未找到指定的日志文件: {log_file_path}") + except Exception as e: + logger.error(f"处理日志文件时出现错误: {e}") + + return df + + +def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): + """ + 检测 step_time 序列中的异常值,并记录异常信息 + + :param step_times: step_time 序列 + :param window_size: 计算移动平均的窗口大小 + :param threshold: 异常判断的阈值,即当前值与移动平均的差值超过多少倍标准差认为是异常 + :return: 异常信息列表,每个元素为 (异常时刻索引, 异常程度) + """ + window_size = model_args.get("steps_window_size", 5) + k_sigma_threshold = model_args.get("k_sigma", 2) + anomaly_degree_thr = model_args.get("anomaly_degree_thr", 0.2) + anomalies = [] + step_times = data_df["step_time"] + timestamps = data_df["time"] + # print(f"training step time: {step_times}") + for i in range(len(step_times)): + if i < window_size: + continue + + moving_average = sum(step_times[i - window_size:i]) / window_size + + variance = sum((x - moving_average) ** 2 for x in step_times[i - window_size:i]) / window_size + std_dev = variance ** 0.5 + + current_anomaly = False + current_step_time = step_times[i] + + diff = current_step_time - moving_average + if diff > k_sigma_threshold * std_dev: + anomaly_degree = diff / moving_average + if anomaly_degree > anomaly_degree_thr: + current_anomaly = True + + if current_anomaly and i + 1 < len(step_times): + next_step_time = step_times[i + 1] + next_diff = next_step_time - moving_average + if next_diff > k_sigma_threshold * std_dev: + next_anomaly_degree = next_diff / anomaly_degree_thr + if next_anomaly_degree > anomaly_degree_thr: + anomalies.append( + {"training_step": i, "anomaly_time": timestamps[i].strftime('%Y-%m-%d %H:%M:%S'), + "anomaly_degree": round(anomaly_degree, 3), + "anomaly_training_time": f"{current_step_time}ms", + "normal_training_time": f"{moving_average}ms"}) + + anomaly_info = {} + if anomalies: + anomaly_info["is_anomaly"] = True + anomaly_info["anomaly_count_times"] = len(anomalies) + anomaly_info["anomaly_info"] = anomalies + else: + anomaly_info["is_anomaly"] = False + anomaly_info["anomaly_count_times"] = 0 + anomaly_info["anomaly_info"] = [] + anomaly_info["start_time"] = int(timestamps.iloc[0].timestamp()) + anomaly_info["end_time"] = int(timestamps.iloc[len(timestamps) - 1].timestamp()) + return anomaly_info + + +def write_anomaly_info(anomaly_info: Dict, fail_slow_perception_path: str, file_ext: str = ".json"): + now_time = datetime.now(timezone.utc).astimezone().astimezone() + now_timestamp = int(now_time.timestamp()) + anomaly_type = anomaly_info.get("anomaly_type", AnomalyType.fail_slow) + fail_slow_perception_path = os.path.join(fail_slow_perception_path, + f"fail_slow_perception_result_{anomaly_type}_{now_timestamp}{file_ext}") + + try: + with open(fail_slow_perception_path, 'w', encoding='utf-8') as json_file: + json.dump(anomaly_info, json_file, ensure_ascii=False, indent=4) + logger.info(f"writing result to {fail_slow_perception_path}") + except Exception as e: + logger.error(f"writing result fail: {e}") + + +def run_slow_node_perception(args: Dict): + training_log = args.get("training_log", "./log/rank0_mindformer.log") + step_data_path = args.get("step_data_path", "/log/step_data.csv") + fail_slow_perception_result = args.get("fail_slow_perception_path", "/log") + os.makedirs(fail_slow_perception_result, exist_ok=True) + + task_stable_step = args.get("task_stable_step", 2) # just for first time detection + fail_slow_span_mins = args.get("fail_slow_span_mins", 0.1) + min_statup_detection_steps = args.get("min_startup_detection_steps", 10) + hang_times_thr = args.get("hang_times_thr", 5) + + detecting_range_steps = [0, 0] + first_flag = False + hang_info = [] + next_detection_timestamp = None + timer_flag = False + while True: + # now_time = datetime.now(timezone.utc).astimezone().astimezone() + # now_timestamp = now_time.timestamp() + # if next_detection_timestamp and now_timestamp > next_detection_timestamp: + # print("waiting run detection.....") + # else: + # continue + # next_detection_timestamp = (now_time + timedelta(minutes=fail_slow_span_mins)).timestamp() + if timer_flag: + time.sleep(2) + timer_flag = True + + data = process_training_log(training_log, step_data_path) + training_steps = len(data) + if not training_steps: + logger.info(f"training data is empty.") + continue + + # if data not training, record not training times + # remove model init process + if training_steps == (detecting_range_steps[1]) and detecting_range_steps[1]: + logger.info("start hang detection") + + now_time = datetime.now(timezone.utc).astimezone().astimezone() + format_str = '%Y-%m-%d %H:%M:%S %z' + now_time_str = now_time.strftime(format_str) + hang_info.append(now_time_str) + if len(hang_info) > hang_times_thr: + # record hang + anomaly_info = { + "anomaly_type": AnomalyType.hang, + "detect_point": hang_info, + "hang_minutes": fail_slow_span_mins * hang_times_thr + } + logger.info("hang detection find training process is hang.") + write_anomaly_info(anomaly_info, fail_slow_perception_result) + break + continue + else: + hang_info = [] + + new_detecting_range_steps = [0, 0] + if not detecting_range_steps[1]: + first_flag = True + new_detecting_range_steps[1] = training_steps + else: + if first_flag: + # second time detect, start not change + new_detecting_range_steps[0] = detecting_range_steps[0] + else: + new_detecting_range_steps[0] = (detecting_range_steps[0] + detecting_range_steps[1]) // 2 + first_flag = False + new_detecting_range_steps[1] = training_steps + + range_steps = new_detecting_range_steps[1] - new_detecting_range_steps[0] + if range_steps < min_statup_detection_steps: + logger.warning( + f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}.") + continue + + detecting_range_steps = new_detecting_range_steps + if first_flag: + detecting_range_steps[0] = task_stable_step + logger.info(f"Detection data: {detecting_range_steps}.") + data = data.loc[detecting_range_steps[0]: detecting_range_steps[1]].reset_index(drop=True) + anomaly_info = detect_step_time_anomalies(data, model_args) + + anomaly_info["anomaly_type"] = AnomalyType.fail_slow + write_anomaly_info(anomaly_info, fail_slow_perception_result) + + fail_slow_stop_flag = os.getenv('FAIL_SLOW_STOP', 'False').lower() == "True" + if fail_slow_stop_flag: + logger.info("User set stop fail slow detection.") + break + + +if __name__ == "__main__": + ''' 循环检测, ''' + with open("../config/model_config.json", 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + run_slow_node_perception(model_args) + + # thread = threading.Thread(target=run_slow_node_perception, args=(model_args,)) + # thread.start() + # + # thread.join() diff --git a/failslow/failslow/main.py b/failslow/failslow/main.py new file mode 100644 index 0000000000000000000000000000000000000000..a182bfd1eb17cf2c299dda07ee3a883e0f20665e --- /dev/null +++ b/failslow/failslow/main.py @@ -0,0 +1,107 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:main.py +Author: h00568282/huangbin +Create Date: 2025/4/21 9:36 +Notes: + +""" +import os +import time +import json +import traceback +from datetime import datetime, timezone + +from typing import Dict +from failslow.response import AIJobDetectResult + +from failslow.slow_node_detection import SlowNodeDetector +from failslow.util.logging_utils import get_default_logger +from failslow.util.constant import METRIC_CONFIG_PATH, MODEL_CONFIG_PATH, AnomalyType, TIMESTAMP_MS_NUM, HOUR_TO_SECONDS + +logger = get_default_logger(__name__) + + +def get_slow_node_detection_time_range(model_args): + end_time = None + start_time = None + with_fail_slow = model_args.get("with_fail_slow", False) + slow_node_detection_range_times = model_args.get("slow_node_detection_range_times", []) + # fail_slow_perception_result.json + fail_slow_perception_path = model_args.get("fail_slow_perception_path", "/log") + slow_node_detection_time_span_hours = model_args.get("slow_node_detection_time_span_hours", 0.5) + if slow_node_detection_range_times: + start_time = slow_node_detection_range_times[0] + end_time = slow_node_detection_range_times[1] + else: + if with_fail_slow: + if os.path.exists(fail_slow_perception_path): + file_slow_result_files = [file for file in os.listdir(fail_slow_perception_path) if + AnomalyType.fail_slow in file] + if file_slow_result_files: + file_slow_result_files.sort(reverse=True) + + latest_slow_result_file = file_slow_result_files[0] + latest_slow_result_path = os.path.join(fail_slow_perception_path, latest_slow_result_file) + with open(latest_slow_result_path, 'r', encoding='utf-8') as reader: + fail_slow_result = json.load(reader) + start_time = fail_slow_result.get("start_time") + end_time = fail_slow_result.get("end_time") + + if end_time is None: + # 若自动触发,当前时间往前推两个小时开始检测 + end_time = int(time.time() * TIMESTAMP_MS_NUM) + time_span = int(int(slow_node_detection_time_span_hours) * HOUR_TO_SECONDS * TIMESTAMP_MS_NUM) + start_time = end_time - time_span + + logger.info( + f"fail slow used:" + str(with_fail_slow) + ", Start time: " + str(start_time) + ", End timestamp: " + str( + end_time)) + return start_time, end_time + + +def write_anomaly_info(anomaly_info: Dict, fail_slow_perception_path: str, file_ext: str = ".json"): + now_time = datetime.now(timezone.utc).astimezone().astimezone() + now_timestamp = int(now_time.timestamp()) + anomaly_type = anomaly_info.get("anomaly_type", AnomalyType.fail_slow) + fail_slow_perception_path = os.path.join(fail_slow_perception_path, + f"fail_slow_perception_result_{anomaly_type}_{now_timestamp}{file_ext}") + + try: + with open(fail_slow_perception_path, 'w', encoding='utf-8') as json_file: + json.dump(anomaly_info, json_file, ensure_ascii=False, indent=4) + print(f"writing result to {fail_slow_perception_path}") + except Exception as e: + print(f"writing result fail: {e}") + + +def main(): + with open(METRIC_CONFIG_PATH, 'r', encoding='utf-8') as reader: + metric_args = json.load(reader) + with open(MODEL_CONFIG_PATH, 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + + start_time, end_time = get_slow_node_detection_time_range(model_args) + detector = SlowNodeDetector(metric_args, model_args, start_time, end_time) + response: AIJobDetectResult = detector.run() + logger.info(f"response: {response}") + slow_node_detection_path = model_args.get("slow_node_detection_path", "/log") + os.makedirs(slow_node_detection_path, exist_ok=True) + slow_node_detection_file = os.path.join(slow_node_detection_path, f"slow_node_result_{response.timestamp}.json") + with open(slow_node_detection_file, "w") as f: + json.dump(response, f) + logger.info("==========finished slow node result record ========") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + response = AIJobDetectResult() + response.error_msg = str(traceback.format_exc()) + response.report_time = str(int(time.time() * 1000)) + logger.info(json.dumps(response)) + logger.error(traceback.format_exc()) + logger.error("Slow node detection error! No Result Return") diff --git a/failslow/failslow/slow_node_detection.py b/failslow/failslow/slow_node_detection.py new file mode 100644 index 0000000000000000000000000000000000000000..dec5f6aeba7444b66d02c87140238c6df81e7488 --- /dev/null +++ b/failslow/failslow/slow_node_detection.py @@ -0,0 +1,603 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:slow_node_detection.py +Author: h00568282/huangbin +Create Date: 2025/2/26 11:23 +Notes: + +""" +import os +import json + +import pprint +from typing import List, Dict +import pandas as pd +import numpy as np + +from failslow.response import AIJobDetectResult +from failslow.dataloader.marker_data_reader import MarkerDataloader, CommGroup +from failslow.alg import time_node_detectors, space_node_detectors +from failslow.util.logging_utils import get_default_logger +from failslow.util.utils import is_same_list, is_continuous +from failslow.process.post_process import PostProcess +from failslow.util.constant import CommOpType, TableItem +from failslow.dataloader.restore_comm import RestoreComm + +Dataloader = MarkerDataloader +logger = get_default_logger(__name__) + + +class SlowNodeDetector: + def __init__(self, metric_args: Dict, model_args: Dict, start_time=None, end_time=None): + ''' + :param root_path: + :param hccl_domains: {tp:8, dp:2, pp:1} + ''' + root_path = model_args.get("root_path", None) + logger.info(f"Input data: {root_path}.") + self.metric_args = metric_args + self.model_args = model_args + + self._root_path = root_path + self.dataloader = Dataloader(root_path, start_time, end_time) + self.comm_groups: List[CommGroup] = self.dataloader.extract_comm_domain() + self.ranks: List = self.dataloader.ranks + self.hccl_domains = self._init_hccl_domains() + + self.aggregate_method = {} + self.post_process = PostProcess(metric_args, model_args) + self.group_anomaly_detector = GroupAnomalyDetector() + self.enable_detect_type = self.model_args.get("enable_detect_type", {}) + self.fail_slow_ops = self.model_args.get("fail_slow_ops", {}) + + def _init_hccl_domains(self): + hccl_domain_config = self.model_args.get("hccl_domain", {}) + if hccl_domain_config: + hccl_domains = hccl_domain_config + else: + restore_comm = RestoreComm(self.comm_groups, self.ranks) + restore_comm() + hccl_domains = restore_comm.comm_domain + logger.info(f"hccl_domains: {hccl_domains}.") + return hccl_domains + + def generate_aggregate_strategy(self, metric_name: str): + def generate_agge_key(_func_method: str, _func_params: Dict): + for key, value in _func_params.items(): + _func_method += f"_{key}-{value}" + + return _func_method + + aggregations = self.metric_args.get(metric_name, {}).get("aggregation", {}) + during_s = aggregations.get("during_s", 5) + self.aggregate_method[metric_name] = { + "during": during_s + } + aggerate_funcs = aggregations.get("funcs", []) + for aggerate_params in aggerate_funcs: + func_method = aggerate_params.get("func", "mean") + func_method_func = getattr(np, func_method) + func_params = aggerate_params.get("func_params", {}) + + if len(aggerate_funcs) == 1: + metric_name_key = metric_name + else: + key = generate_agge_key(func_method, func_params) + metric_name_key = f"{metric_name}!{key}" + + self.aggregate_method[metric_name][metric_name_key] = [func_method_func, func_params] + + def aggregate_by_timestamp(self, df: pd.DataFrame, metric_name: str) -> Dict: + ''' aggregate by start timestamp within during time. + ex: + dur_p90 = np.percentile(group['dur'], 90) + dur_p95 = np.percentile(group['dur'], 95) + dur_mean_value = group['dur'].mean() + ''' + + start_time = df[TableItem.ex_start_ts].min() + + aggregate_method_by_metric_name = self.aggregate_method.get(metric_name, {}) + during_s = aggregate_method_by_metric_name.get("during", 5) + during_ms = during_s * 10 ** 3 + # fake interval + df[TableItem.aggregate_window_size] = ((df[TableItem.ex_start_ts] - start_time) // during_ms) + 1 + grouped = df.groupby(TableItem.aggregate_window_size) + + result = [] + for interval, group in grouped: + start_timestamp = group[TableItem.ex_start_ts].min() + tmp_point_dict = {TableItem.alg_timestamp: start_timestamp} + for metric_name_key, aggerate_funcs in aggregate_method_by_metric_name.items(): + if metric_name_key == "during": + continue + func_method_func = aggerate_funcs[0] + func_params = aggerate_funcs[1] + value = func_method_func(group[TableItem.op_execute], **func_params) + + tmp_point_dict[metric_name_key] = value + + result.append(tmp_point_dict) + + result_df = pd.DataFrame(result) + cols = result_df.columns + + result_dfs = {} + for col in cols: + if col == TableItem.alg_timestamp: + continue + result_dfs[col] = result_df[[TableItem.alg_timestamp, col]] + + return result_dfs + + def plot_step_time(self, data: pd.DataFrame, metric_name: str, rank: int, ext: str = "latency"): + import matplotlib.pyplot as plt + + data = np.array(data) + plt.figure(figsize=(10, 6)) + # 去掉前0.1的数据 + data_len = len(data) + data = data[int(data_len * 0.1):] + plt.plot(data, label='raw_latency', marker='o') + plt.title(f'Rank: {rank}, comm op {metric_name} latency.') + plt.xlabel('index') + plt.ylabel('latency(ms)') + plt.legend() + plt.grid(True) + save_image_path = os.path.join(self._root_path, self.model_args.get("save_image", "image")) + os.makedirs(save_image_path, exist_ok=True) + save_image = os.path.join(save_image_path, f"rank_{rank}-op_{metric_name}-{ext}.png") + plt.savefig(save_image) + + @staticmethod + def output_anomaly_devices(metric: str, anomaly_location: dict): + anomaly_devices = [] + for device_info in anomaly_location.keys(): + # 异常点数大于0, 则认为该指标出现异常 + if np.sum(anomaly_location[device_info][metric][1]) > 0: + anomaly_devices.append(device_info) + + return anomaly_devices + + def preprocess_group_data(self, group_ranks_list: List, comm_name: str, metric_name: str, is_op_launch: bool = False): + ''' + @params: + group_ranks_list: 处理通信组的卡号列表 + comm_name: str, 通信组名称 + metric_name: str, 通信算子名称,算子下发带'_launch'后缀,用于metric_config中查找算法配置 + is_op_launch: bool, 判断是算子下发,算子执行 + ''' + # load + if is_op_launch: + group_data = self.dataloader.read_op_launch_df_by_ranks(group_ranks_list) + mspti_metric_name = metric_name.split("_")[0] + else: + group_data = self.dataloader.read_device_df_by_ranks(group_ranks_list) + mspti_metric_name = metric_name + # preprocess + # 1 filter by comm_name + # 2 filter by comm_op + # 3 calculate latency of comm_op + # 4 按照开始时间戳5s内设置step组合 + # 5 裁剪掉最前面step的数据 + new_detect_group_data = [] + length = 0 + for rank_id, data_df in group_data.items(): + data_df = data_df[data_df[TableItem.ex_comm_group] == comm_name] + # ns -> ms + data_df = data_df[data_df[TableItem.ex_comm_op] == mspti_metric_name] + data_df[TableItem.ex_end_ts] = data_df[TableItem.ex_end_ts] / (1e6 * 1.) + data_df[TableItem.ex_start_ts] = data_df[TableItem.ex_start_ts] / (1e6 * 1.) + + data_df[TableItem.op_execute] = abs(data_df[TableItem.ex_end_ts] - data_df[TableItem.ex_start_ts]) + # plot data + if self.model_args.get("use_plot", False): + self.plot_step_time(data_df[TableItem.op_execute], metric_name, rank_id) + + # 250517: 裁剪训练初始step的数据 + data_df = data_df[int(len(data_df) * 0.2):] + aggerated_data_dfs: Dict = self.aggregate_by_timestamp(data_df, metric_name) + if aggerated_data_dfs: + length = len(list(aggerated_data_dfs.values())[0]) + else: + length = 0 + if not new_detect_group_data: + new_detect_group_data = [{} for _ in range(len(aggerated_data_dfs))] + + for index, (agge_matric_name, agge_data_df) in enumerate(aggerated_data_dfs.items()): + if "metric_name" not in new_detect_group_data[index].keys(): + new_detect_group_data[index]["metric_name"] = agge_matric_name + + if "data" not in new_detect_group_data[index].keys(): + new_detect_group_data[index]["data"] = {} + new_detect_group_data[index]["data"][rank_id] = agge_data_df + + return new_detect_group_data, length + + def group_detect_single_kpi(self, metric_name: str, group_ranks_list: List, comm_name: str, + is_group: bool = False, is_op_launch: bool = False) -> List: + detect_datas, data_len = self.preprocess_group_data(group_ranks_list, comm_name, metric_name, is_op_launch=is_op_launch) + + all_results = [] + for detect_data in detect_datas: + detection_results = self.detect_single_aggerate_metric(data_len, detect_data["metric_name"], + detect_data["data"]) + all_results.append(detection_results) + + return all_results + + def detect_single_aggerate_metric(self, min_data_len: int, metric_name: str, detect_data): + anomaly_devices = [] + anomaly_locations = {} + time_anomaly_locations = {} + space_anomaly_locations = {} + metric_name_key = metric_name.split("!")[0] + + detection_results = { + "anomaly_devices": anomaly_devices, + "anomaly_locations": anomaly_locations, + "detect_result_type": "TIME", + "metric_name": metric_name, + "group_data": detect_data, + } + logger.info(f"length: {min_data_len} ***************") + + if min_data_len == 0: + logger.warning("GROUP data contains EMPTY DATA. GROUP_DATA:%s", pprint.pformat(detect_data)) + return [detection_results] + # 时间检测 + logger.info("work on %s, %s started.", metric_name, "time node compare") + metric_arg = self.metric_args.get(metric_name_key) + time_detector_arg = metric_arg.get("time_detector") + if time_detector_arg is not None: + time_anomaly_locations = self.group_anomaly_detector.time_node_compare(metric_name, time_detector_arg, + detect_data) + logger.info( + f"time node compare result: {self.output_anomaly_devices(metric_name, time_anomaly_locations)}.") + logger.info("work on %s, %s finished.", metric_name, "time node compare") + + space_detector_arg = metric_arg.get("space_detector") + if space_detector_arg is not None: + # 四个以上的对象才进行均质化 + if len(detect_data) >= 4: + # 空间维度对比,输出异常节点 + space_anomaly_locations = self.group_anomaly_detector.space_nodes_compare(metric_name, + space_detector_arg, + detect_data) + logger.info( + f"space_nodes_compare finish, result: {self.output_anomaly_devices(metric_name, space_anomaly_locations)}.") + else: + logger.info( + f"Skip space nodes compare, due to nodes number {len(detect_data)} is smaller than 4.") + else: + logger.info(f"Skip space nodes compare.") + + # 时间空间结果融合 + anomaly_locations, detect_result_type = self.group_anomaly_detector.time_space_agg(time_anomaly_locations, + space_anomaly_locations, + metric_name) + anomaly_devices = self.output_anomaly_devices(metric_name, anomaly_locations) + detection_results["anomaly_devices"] = anomaly_devices + detection_results["anomaly_locations"] = anomaly_locations + detection_results["detect_result_type"] = detect_result_type + + logger.info(f'''Time and space aggregated result: {anomaly_devices}.''') + logger.info("work on %s, %s end.", metric_name, "slow_node_detection") + + return detection_results + + def get_send_groups(self, metric_name: str) -> List[CommGroup]: + ''' + 获取流水线并行的待检测组 + 规则: + 1 通信组需要包含流水线并行通信算子 + 2 + ''' + pp_groups = self.hccl_domains.get("pp", []) + + send_groups = [] + for pp_group in pp_groups: + for comm_group in self.comm_groups: + ops_list = list(comm_group.count_ops.keys()) + if metric_name in ops_list and is_same_list(comm_group.group_ranks, pp_group): + send_groups.append(comm_group) + + return send_groups + + def merge_group_data(self, all_group_df: Dict, metric_name: str, detect_datas: List, group_ranks_list: List): + ''' + + :param detect_datas: [{"metric_name":..., "data":...}, ...] + :return: + ''' + detect_data = detect_datas[0]["data"] + group_key = f"{group_ranks_list}" + + df_list = list(detect_data.values()) + base_df = df_list[0] + + merged_df = pd.concat([df[metric_name] for df in df_list], axis=1) + merged_df["merge_value"] = merged_df.mean(axis=1, skipna=True) + result_df = pd.DataFrame({ + 'timestamp': base_df['timestamp'], + metric_name: merged_df['merge_value'].values + }) + all_group_df[group_key] = result_df + + def detect_group_slow(self, metric_name): + ''' + step1 selec large comm group from tp or dp, (eg: tp4, dp2, pp4 -> select tp4) + :return: + ''' + self.generate_aggregate_strategy(metric_name) + tp_groups = self.hccl_domains.get("tp", []) + dp_groups = self.hccl_domains.get("dp", []) + dp_size_per_group = len(dp_groups[0]) + tp_size_per_group = len(tp_groups[0]) + if tp_size_per_group == 1: + metric_name = CommOpType.all_reduce + target_groups = dp_groups + else: + target_groups = tp_groups + + all_group_df = {} + all_data_len = 0 + for target_group in target_groups: + for comm_group in self.comm_groups: + comm_name = comm_group.comm_name + group_ranks_list = comm_group.group_ranks + if (not is_same_list(group_ranks_list, target_group)) or ( + not is_continuous(group_ranks_list)): + continue + detect_datas, data_len = self.preprocess_group_data(group_ranks_list, comm_name, metric_name) + all_data_len = data_len + self.merge_group_data(all_group_df, metric_name, detect_datas, group_ranks_list) + + logger.info(f"Starting Comm Group Slow Detect.") + detection_results = self.detect_single_aggerate_metric(all_data_len, metric_name, all_group_df) + logger.info(f"Finishing Comm Group Slow Detect.\n") + + return detection_results + + def detect_cal_slow(self, metric_name: str = CommOpType.reduce_scatter): + ''' comparing tp comm op to find slow card + :param + ''' + self.generate_aggregate_strategy(metric_name) + dp_groups = self.hccl_domains.get("dp", []) + tp_groups = self.hccl_domains.get("tp", []) + if dp_groups: + dp_size_per_group = len(dp_groups[0]) + else: + dp_size_per_group = 0 + if tp_groups: + tp_size_per_group = len(tp_groups[0]) + else: + tp_size_per_group = 0 + + if tp_size_per_group == 1: + target_groups = dp_groups + else: + target_groups = tp_groups + + all_results = [] + for target_group in target_groups: + for comm_group in self.comm_groups: + comm_name = comm_group.comm_name + group_ranks_list = comm_group.group_ranks + if (not is_same_list(group_ranks_list, target_group)) or ( + not is_continuous(group_ranks_list)): + continue + + logger.info(f"Start Calculating Slow Detect in Group {group_ranks_list}.") + group_result = self.group_detect_single_kpi(metric_name, group_ranks_list, comm_name) + logger.info(f"Finishing Calculating Slow Detect in Group {group_ranks_list}.\n") + all_results.extend(group_result) + + return all_results + + def detect_comm_slow(self, metric_name: str = CommOpType.send) -> List[Dict]: + ''' use send/recieve comm op to find slow pair. and then aggregate to most anomaly card + src send, dst recieve + ''' + self.generate_aggregate_strategy(metric_name) + group_slow_results = self.detect_group_slow(CommOpType.all_gather) + + all_results = [] + send_groups = self.get_send_groups(metric_name) + for comm_group in send_groups: + comm_name = comm_group.comm_name + group_ranks_list = comm_group.group_ranks + logger.info(f"Start Comm Pair Slow Detect in Group {group_ranks_list}.") + result = self.group_detect_single_kpi(metric_name, group_ranks_list, comm_name) + logger.info(f"Finishing Comm Pair Slow Detect in Group {group_ranks_list}.\n") + # here vote for most anomaly node + merge_result = self.post_process.process_comm_slow_result(result, group_slow_results["anomaly_devices"], + group_ranks_list) + all_results.extend(merge_result) + + return all_results + + def detect_op_launch_slow(self, metric_name): + self.generate_aggregate_strategy(metric_name) + dp_groups = self.hccl_domains.get("dp", []) + tp_groups = self.hccl_domains.get("tp", []) + + if dp_groups: + dp_size_per_group = len(dp_groups[0]) + else: + dp_size_per_group = 0 + if tp_groups: + tp_size_per_group = len(tp_groups[0]) + else: + tp_size_per_group = 0 + if tp_size_per_group == 1: + target_groups = dp_groups + else: + target_groups = tp_groups + + all_results = [] + for target_group in target_groups: + for comm_group in self.comm_groups: + comm_name = comm_group.comm_name + group_ranks_list = comm_group.group_ranks + if (not is_same_list(group_ranks_list, target_group)) or ( + not is_continuous(group_ranks_list)): + continue + + logger.info(f"Start Op Launching Slow Detect in Group {group_ranks_list}.") + group_result = self.group_detect_single_kpi(metric_name, group_ranks_list, comm_name, is_op_launch=True) + logger.info(f"Finishing Op Launching Slow Detect in Group {group_ranks_list}.\n") + all_results.extend(group_result) + + return all_results + + def detect(self) -> AIJobDetectResult: + ''' + detect fail slow type with comm ops: + "cal_slow": "HcclAllGather", + "op_launch_slow": "HcclAllGather_launch", + "comm_slow": "HcclBatchSendRecv" + ''' + all_results = [] + enable_cal = self.enable_detect_type.get("enable_cal", True) + enable_op_launch = self.enable_detect_type.get("enable_op_launch", False) + enable_comm = self.enable_detect_type.get("enable_comm", False) + if enable_cal: + cal_slow_op = self.fail_slow_ops.get("cal_slow", CommOpType.all_gather) + cal_slow_results = self.detect_cal_slow(cal_slow_op) + all_results.extend(cal_slow_results) + if enable_op_launch: + op_launch_slow_op = self.fail_slow_ops.get("op_launch_slow", f"{CommOpType.all_gather}_launch") + op_launch_results = self.detect_op_launch_slow(op_launch_slow_op) + all_results.extend(op_launch_results) + if enable_comm: + if len(self.ranks) >= 8: + ''' default multi node training scene.''' + comm_slow_op = self.fail_slow_ops.get("comm_slow", CommOpType.batch_send_recv) + comm_slow_results = self.detect_comm_slow(comm_slow_op) + all_results.extend(comm_slow_results) + + response, all_anomaly_nodes = self.post_process.gen_final_alarm(all_results) + + return response + + def rm_csv_files(self): + for filename in os.listdir(self._root_path): + file_path = os.path.join(self._root_path, filename) + if os.path.isfile(file_path) and (filename.endswith('op_launch.csv') or filename.endswith('device.csv')): + try: + os.remove(file_path) + except Exception as e: + logger.warning(f"file can not remove {file_path}: {e}") + + def run(self) -> AIJobDetectResult: + response = self.detect() + if not self.model_args.get("debug_data", False): + self.rm_csv_files() + + return response + +class GroupAnomalyDetector: + ''' space compare in group, and time compare in single ts ''' + + def __init__(self): + pass + + @staticmethod + def time_space_agg(time_anomaly_locations, space_anomaly_locations, metric_name): + detect_result_type = {} + + for node_id in time_anomaly_locations.keys(): + time_ret = np.sum(time_anomaly_locations[node_id][metric_name][1]) + if space_anomaly_locations: + space_ret = np.sum(space_anomaly_locations[node_id][metric_name][1]) + # 如果均质化没有报错则消除告警 + # 若空间检测和时间检测结果都为空,则返回正常值 + # 若时间维度和空间维度都出现异常,以空间维度为主返回结果 + if space_ret == 0 or (space_ret > 0 and time_ret >= 0): + time_anomaly_locations[node_id][metric_name] = space_anomaly_locations[node_id][metric_name] + detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "SPACE") + else: + detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "TIME") + else: + detect_result_type.setdefault(node_id, {}).setdefault(metric_name, "TIME") + + return time_anomaly_locations, detect_result_type + + def time_node_compare(self, metric_name: str, cfg: Dict, detect_data: Dict): + detector_class = time_node_detectors.get(cfg.get("type")) + time_node_detector = detector_class(metric_name=metric_name, cfg=cfg) + time_node_detector.fit(detect_data) + locations = time_node_detector.predict(detect_data) + expert_alarm_window_size = cfg.get("alarm_filter_window_size") + + for device_info, anomaly_locations in locations.items(): + filter_labels = self.alarm_filter(anomaly_locations[metric_name][1], expert_alarm_window_size) + locations[device_info][metric_name][1][:] = filter_labels + + return locations + + @staticmethod + def alarm_filter(labels, alarm_filter_window_size): + copy_labels = np.zeros(len(labels)) + start_index = alarm_filter_window_size + alarm_points = set() + for i in range(start_index, len(labels) + 1): + is_sequential_alarm = (np.sum(labels[i - alarm_filter_window_size:i]) >= alarm_filter_window_size) + if not is_sequential_alarm: + if np.sum(labels[i - alarm_filter_window_size:i]) > 0: + alarm_points.add(i - alarm_filter_window_size) + else: + copy_labels[i - alarm_filter_window_size:i] = labels[i - alarm_filter_window_size:i] + # if alarm_points: + # logger.info(f"Alert Remove from point loc", list(alarm_points)) + + return copy_labels + + def space_nodes_compare(self, metric_name: str, cfg: Dict, detect_data: Dict): + detector_class = space_node_detectors.get(cfg.get("type")) + space_detector = detector_class(cfg) + df = pd.DataFrame() + column_list = [] + for device_label, infer_data in detect_data.items(): + df[device_label] = infer_data[metric_name] + column_list.append(device_label) + + detect_node_data = df[column_list].values + + labels = space_detector.detect(detect_node_data) + + labels = np.swapaxes(labels, 0, 1) + space_detect_locations = {} + + i = 0 + for device_label in column_list: + space_detect_locations[device_label] = {} + space_detect_locations[device_label][metric_name] = detect_data[device_label]["timestamp"], labels[i] + i += 1 + return space_detect_locations + + +if __name__ == "__main__": + ''' + 感知触发,感知模块发现性能劣化,触发慢节点定界,能保证获取到的数据,前半部分异常,后半部分正常 + 补充感知这块逻辑 + 循环触发,每隔半小时触发一次,时序数据有三种状态,时间序列维度上,全部正常,部分正常-部分异常,全部异常,需增加空间对比 + + ''' + with open("../config/metric_config.json", 'r', encoding='utf-8') as reader: + metric_args = json.load(reader) + with open("../config/model_config.json", 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + + start_time = 1743675836 + end_time = 1743691878 + start_time = None + end_time = None + detector = SlowNodeDetector(metric_args, model_args, start_time, end_time) + response: AIJobDetectResult = detector.detect() + + logger.info(f"reponse: {response}") diff --git a/failslow/failslow/util/__init__.py b/failslow/failslow/util/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/failslow/failslow/util/constant.py b/failslow/failslow/util/constant.py new file mode 100644 index 0000000000000000000000000000000000000000..3ceb4ea945862b7c5464aec64c54b93e41cc19fc --- /dev/null +++ b/failslow/failslow/util/constant.py @@ -0,0 +1,112 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:instant.py +Author: h00568282/huangbin +Create Date: 2025/3/14 15:44 +Notes: + +""" + +from typing import List, Dict + +METRIC_CONFIG_PATH = "/etc/systrace/config/metric_config.json" +MODEL_CONFIG_PATH = "/etc/systrace/config/model_config.json" + +TIMESTAMP_MS_NUM = 1000 +HOUR_TO_SECONDS = 3600 +MS_TO_NS = 1e6 + +class AnomalyType: + fail_slow = "failSlow" + hang = "hang" + +class SlowType: + slow_cal = 0 + slow_comm = 1 + + +class TableItem: + ''' + mspti sample comm op data + Domain,Flag,Id,Kind,Name,SourceKind,Timestamp,msptiObjecId_Ds_DeviceId,msptiObjecId_Ds_StreamId,msptiObjecId_Pt_ProcessId,msptiObjecId_Pt_ThreadId + ''' + domain = "Domain" + flag = "Flag" + id = "Id" + kind = "Kind" + source_kind = "SourceKind" + name = "Name" + timestamp = "Timestamp" + device_id = "msptiObjectId_Ds_DeviceId" + stream_id = "msptiObjectId_Ds_StreamId" + process_id = "msptiObjectId_Pt_ProcessId" + thread_id = "msptiObjectId_Pt_ThreadId" + + ex_start_ts = "Start" + ex_end_ts = "End" + ex_comm_op = "Comm_op" + ex_comm_group = "Comm_group" + ex_data_type = "Data_type" + ex_count = "Count" + + op_execute = "Excute_time" + op_launch = "Launch_time" + + aggregate_window_size = "aggregate_window_size" + alg_timestamp = "timestamp" + + +class CommOpType: + reduce_scatter = "HcclReduceScatter" + all_reduce = "HcclAllreduce" + all_gather = "HcclAllGather" + send = "HcclSend" + receive = "HcclReceive" + batch_send_recv = "HcclBatchSendRecv" + broadcast = "HcclBroadcast" + + +class CommGroup: + def __init__(self, comm_name: str, slice_index: int, comm_op: str, ranks: List, count_ops: Dict) -> None: + self._comm_name = comm_name + self._comm_op = comm_op + self._ranks = ranks if isinstance(ranks, List) else [ranks] + self._slice_index = slice_index + self._count_ops = count_ops + + @property + def comm_name(self) -> str: + return self._comm_name + + @property + def slice_index(self) -> int: + return self._slice_index + + @property + def comm_op(self) -> str: + return self._comm_op + + @property + def group_ranks(self) -> List: + return self._ranks + + def set_group_ranks(self, ranks: List): + self._ranks = ranks + + @property + def count_ops(self) -> Dict: + return self._count_ops + + def extend_group_rank(self, ranks: List): + self._ranks.extend(ranks) + + def __eq__(self, other) -> bool: + if isinstance(other, CommGroup): + return self._comm_name == other._comm_name + + return False + + def __repr__(self): + return f"CommGroup(_comm_name='{self._comm_name}', comm_op={self._comm_op}, _ranks: {self._ranks}, slice_index: {self._slice_index}, count_ops: {self._count_ops})" diff --git a/failslow/failslow/util/logging_config.py b/failslow/failslow/util/logging_config.py new file mode 100644 index 0000000000000000000000000000000000000000..96970d7d686aa6305d7ecf3fb414ab86aa363f3e --- /dev/null +++ b/failslow/failslow/util/logging_config.py @@ -0,0 +1,27 @@ +# coding=utf-8 +import logging +import os + +LOGGER_LEVEL_ENV = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARN": logging.WARNING, + "ERROR": logging.ERROR +} + +LOG_FILE_ROOT = os.path.join(os.getenv('_APP_LOG_DIR', ""), os.getenv('POD_NAME', ""), 'log') + +LOG_LEVEL = "INFO" +# 压缩文件数量 +MAX_BACKUP_COUNT = 5 +# 单位是兆 +FILE_SIZE = 200 + +PRINT_LOG_TO_CONSOLE = True +MAX_SIZE = FILE_SIZE * 1024 * 1024 +ROTATE_MAX_SIZE = 1 * MAX_SIZE + +LOGGER_CONTENT_FORMAT = "%(asctime)s.%(msecs)03d(%(process)d|%(thread)d)\ +[%(levelname)s][%(module)s:%(lineno)d]%(message)s" + +LOGGER_TIME_FORMAT = "%Y-%m-%d %H:%M:%S" diff --git a/failslow/failslow/util/logging_utils.py b/failslow/failslow/util/logging_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..826a6c9409371d5d9894dee2dc2574f4d2325031 --- /dev/null +++ b/failslow/failslow/util/logging_utils.py @@ -0,0 +1,205 @@ +# coding=utf-8 + +import logging +import logging.handlers +import os +import re +import secrets +import time +import traceback +from datetime import datetime +from zipfile import ZIP_DEFLATED, ZipFile + +from failslow.util import logging_config +from failslow.util.logging_config import MAX_SIZE, MAX_BACKUP_COUNT, LOG_LEVEL, LOG_FILE_ROOT, PRINT_LOG_TO_CONSOLE + +__DEFAULT_LOGGERS = dict() + +root_path = os.path.dirname(os.path.dirname(__file__)) + +formatter = logging.Formatter( + logging_config.LOGGER_CONTENT_FORMAT, + logging_config.LOGGER_TIME_FORMAT) + + +def get_default_logger(module="default", log_path=None) -> logging.Logger: + global __DEFAULT_LOGGERS + if not __DEFAULT_LOGGERS.get(module): + __DEFAULT_LOGGERS[module] = get_logger(module=module, + log_path=log_path or os.path.join(LOG_FILE_ROOT, + "failslow_localization.log"), + max_file_size=MAX_SIZE, + max_backup_count=MAX_BACKUP_COUNT) + + return __DEFAULT_LOGGERS.get(module) + + +def get_logger(module, log_path, max_file_size, max_backup_count): + logger = logging.getLogger(module) + if len(logger.handlers) == 0: + logger.propagate = 0 + logger.setLevel(LOG_LEVEL) + if PRINT_LOG_TO_CONSOLE: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + else: + handler = get_log_handler(log_path, max_file_size, max_backup_count) + logger.addHandler(handler) + return logger + + +def get_dummy_logger(): + logger = logging.getLogger() + if len(logger.handlers) == 0: + logger.propagate = 0 + logger.setLevel(LOG_LEVEL) + """ + When the permission of the log directory is 000, + to avoid the execution of the command line, + the log is printed to the console. + """ + logger.addHandler(logging.NullHandler()) + return logger + + +def uniep_log_namer(name): + return name + ".zip" + + +def uniep_log_rotator(source, dest): + """ + Dump function, the main logic is as follows: + 1. First compress the file + 2. According to the dump file, get the final file content, + and write to the source file to prevent the loss of file content. + """ + try: + with ZipFile(dest, "w", ZIP_DEFLATED) as archived_file: + archived_file.write(source, os.path.basename(source)) + if os.path.getsize(source) > 2 * logging_config.ROTATE_MAX_SIZE: + # Abnormal scenes directly clear the file. Avoid unlimited log growth. + os.remove(source) + except Exception: + # Abnormal scenes directly clear the file. Avoid unlimited log growth. + os.remove(source) + + +class LockFile: + def __init__(self, roll_tag_file_path): + self._roll_tag_file_path = roll_tag_file_path + + def __enter__(self): + with os.fdopen(os.open(self._roll_tag_file_path, os.O_WRONLY | os.O_CREAT, 0o640), "w"): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + if os.path.exists(self._roll_tag_file_path): + os.remove(self._roll_tag_file_path) + + +def get_log_handler(log_path, max_file_size, max_backup_count): + sop_log_handler = None + try: + log_dir = os.path.dirname(log_path) + os.makedirs(log_dir, 0o750, exist_ok=True) + sop_log_handler = ConcurrentRotatingFileHandler(log_path, + max_bytes=max_file_size, + backup_count=max_backup_count) + sop_log_handler.rotator = uniep_log_rotator + sop_log_handler.namer = uniep_log_namer + sop_log_handler.setFormatter(formatter) + except BaseException: + get_dummy_logger().exception('Get LOGGER filed, used stdout instead') + return sop_log_handler + + +class ConcurrentRotatingFileHandler(logging.handlers.RotatingFileHandler, + logging.handlers.WatchedFileHandler): + def __init__(self, log_path, max_bytes=20, backup_count=5): + logging.handlers.RotatingFileHandler.__init__(self, log_path, + maxBytes=max_bytes, + backupCount=backup_count) + logging.handlers.WatchedFileHandler.__init__(self, log_path) + + self._sop_file_size = max_bytes + self.dev, self.ino = -1, -1 + log_dir, log_full_name = os.path.split(log_path) + + self.log_dir = log_dir + self.log_full_name = log_full_name + self._roll_tag_file_path = os.path.join(log_dir, "ROLLING_TAG") + self._statstream() + + @staticmethod + def get_zip_log_files(log_dir, log_full_name): + log_name = log_full_name.rstrip(".log") + watched_backup_file = [] + create_time_dict = {} + for file_name in os.listdir(log_dir): + if re.match(log_name + r"_\d+\.zip", file_name): + path = os.path.join(log_dir, file_name) + watched_backup_file.append(path) + search_result = re.search(r"(\d+)", file_name) + create_time_dict[path] = search_result.group() + + watched_backup_file = sorted(watched_backup_file, key=lambda x: create_time_dict[x]) + return watched_backup_file + + def doRollover(self): + if self.stream: + self.stream.close() + self.stream = None + max_try = 0 + if self.backupCount > 0: + while max_try < 10: + if os.path.exists(self.baseFilename): + _rotator_file_size = os.path.getsize(self.baseFilename) + else: + with os.fdopen(os.open(self.baseFilename, os.O_WRONLY | os.O_CREAT, 0o640), "w"): + pass + _rotator_file_size = 0 + if _rotator_file_size < logging_config.ROTATE_MAX_SIZE: + return + else: + # 避免多个进程同时创建了lock_file + for i in range(10): + time.sleep(secrets.randbelow(100) / 1000) # 生成0到0.099之间的随机浮点数 + while os.path.exists(self._roll_tag_file_path): + time.sleep(max(0.5, secrets.randbelow(1000) / 1000)) # 生成0到0.999之间的随机浮点数,至少为0.5 + max_try += 1 + else: + try: + self.rotate_backup_dfn() + except Exception: + if not os.path.exists(self.baseFilename): + with os.fdopen(os.open(self.baseFilename, os.O_WRONLY | os.O_CREAT, 0o640), "w"): + pass + with os.fdopen(os.open(self.baseFilename, os.O_WRONLY | os.O_CREAT, 0o640), "r+") as f: + f.write(traceback.print_exc()) + + if not self.delay: + self.stream = self._open() + + def rotate_backup_dfn(self): + with LockFile(self._roll_tag_file_path): + now = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + current_zip_files = self.get_zip_log_files(self.log_dir, self.log_full_name) + while len(current_zip_files) >= self.backupCount: + oldest_backup = current_zip_files[0] + if os.path.exists(oldest_backup): + os.remove(oldest_backup) + current_zip_files = self.get_zip_log_files(self.log_dir, self.log_full_name) + # 多进程下 + name, suffix = os.path.splitext(self.baseFilename) + os.rename(self.baseFilename, name + "_" + now + suffix) + if not os.path.exists(self.baseFilename): + with os.fdopen(os.open(self.baseFilename, os.O_WRONLY | os.O_CREAT, 0o640), "w"): + pass + backup_dfn = self.rotation_filename(name + "_" + now) + self.rotate(name + "_" + now + suffix, backup_dfn) + if os.path.exists(name + "_" + now + suffix): + os.remove(name + "_" + now + suffix) + + def emit(self, record): + self.reopenIfNeeded() + super().emit(record) diff --git a/failslow/failslow/util/utils.py b/failslow/failslow/util/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..c260167f2fc5f6cb8ac603af9ba436aefd042cc0 --- /dev/null +++ b/failslow/failslow/util/utils.py @@ -0,0 +1,108 @@ +# coding=utf-8 +import json +import logging +import threading +import time +from functools import wraps + +import numpy as np + +from failslow.util.logging_utils import get_default_logger + +LOGGER = get_default_logger(__name__) + + +def typed_property(name, expected_type, strict_type_check=True): + """create property for class and check types.""" + storage_name = '_' + name + + @property + def prop(self): + result = getattr(self, storage_name, None) + msg = "property '{}' of instance '{}' hasn't been set. And returning None.".format(name, type(self)) + if result is None: + logging.warning(msg) + return result + + @prop.setter + def prop(self, value): + msg = "property '{}' of instance '{}' must be a {}, but got %s with type {}" + msg = msg.format(name, type(self), expected_type, value, type(value)) + if hasattr(self, "__hijack_type_check__") and self.__hijack_type_check__: + setattr(self, storage_name, value) + elif strict_type_check: + if isinstance(value, expected_type): + setattr(self, storage_name, value) + else: + raise ValueError(msg) + else: + if not isinstance(value, expected_type): + LOGGER.warning(msg) + setattr(self, storage_name, value) + + return prop + + +class Thread(threading.Thread): + """ + Rewrite the thread in the official hreading so that the return value + of the function can be obtained. + """ + _results = None + + def run(self): + """Method representing the thread's activity.""" + try: + if self._target: + self._results = self._target(*self._args, **self._kwargs) + finally: + # Avoid a refcycle if the thread is running a function with + # an argument that has a member that points to the thread. + del self._target, self._args, self._kwargs + + def get_results(self, timeout=None): + self.join(timeout=timeout) + return self._results + + +def cal_time(log_obj: logging.Logger, logger_level="info"): + def _cal_time(func): + @wraps(func) + def _wrap(*args, **kwargs): + t0 = time.time() + res = func(*args, **kwargs) + t1 = time.time() + msg = f"function named '{func.__name__} cost {t1 - t0:}s" + getattr(log_obj, logger_level)(msg) + return res + + return _wrap + + return _cal_time + + +class NpEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + else: + return super(NpEncoder, self).default(obj) + +def is_same_list(list1, list2): + return sorted(list1) == sorted(list2) + + +def is_continuous(rank_list): + if not rank_list: + return True + + sorted_list = sorted(rank_list) + for i in range(len(sorted_list) - 1): + # 检查每一对相邻元素之间的差是否为1 + if sorted_list[i + 1] - sorted_list[i] != 1: + return False + return True diff --git a/failslow/requirements.txt b/failslow/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..4c822aeb457f3cecfcade0b16526b49aa3e6fdde --- /dev/null +++ b/failslow/requirements.txt @@ -0,0 +1,8 @@ +numpy==1.22.4 +PyYAML==5.3 +pandas==2.2.2 +setuptools==63.2.0 +xarray==2023.10.0 +matplotlib +scikit_learn==0.24.2 +joblib==0.14.1 \ No newline at end of file diff --git a/failslow/service/systrace-failslow.service b/failslow/service/systrace-failslow.service new file mode 100644 index 0000000000000000000000000000000000000000..f6c14f6d896fbe78df9f408da0331538c07ac5fd --- /dev/null +++ b/failslow/service/systrace-failslow.service @@ -0,0 +1,12 @@ +[Unit] +Description=A-Ops fail slow detection service +After=network.target + +[Service] +Type=exec +ExecStart=/usr/bin/systrac-failslow +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/failslow/setup.py b/failslow/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..6ae1e0b651240660572b4f1c1d6e1515171fe519 --- /dev/null +++ b/failslow/setup.py @@ -0,0 +1,55 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) 2022 Huawei Technologies Co., Ltd. +# gala-anteater is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ + +from glob import glob + +from setuptools import setup, find_packages +import os + +# 安装前清理旧版本配置文件 +cfg_path = "/etc/systrace" +for root, dirs, files in os.walk(cfg_path): + for file in files: + os.remove(os.path.join(root, file)) + +ser = "/usr/lib/systemd/system/systrac-failslow.service" +if os.path.isfile(ser): + os.remove(ser) + +setup( + name="systrace_failslow", + version="1.0.0", + author="bin huang", + author_email="huangbin58@huawei.com", + description="Fail Slow Detection for AI Model Training and Inference", + url="https://gitee.com/openeuler/sysTrace", + keywords=["Fail Slow Detection", "Group Compare", "AI Model"], + packages=find_packages(where=".", exclude=("tests", "tests.*")), + data_files=[ + ('/etc/systrace/config/', glob('config/metric_config.json')), + ('/etc/systrace/config/', glob('config/model_config.json')), + ('/usr/lib/systemd/system/', glob('service/*')), + ], + install_requires=[ + "numpy", + "pandas", + "matplotlib", + "joblib", + "scikit_learn", + ], + entry_points={ + "console_scripts": [ + "systrace-failslow=failslow.main:main", + ] + } +)