diff --git a/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py b/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py index e984427698c21b6881a4243c3e39f47fa5d594da..8ebb4f3edb9b4ee922a759fd8f17772ea2457da0 100644 --- a/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py +++ b/failslow/failslow/alg/time_comp_detector/time_alg/__init__.py @@ -2,3 +2,4 @@ from .sliding_window_nsigma import SlidingWindowNSigma from .ts_dbscan import TSDBSCAN +from .linear_detector import LinearDetector \ No newline at end of file diff --git a/failslow/failslow/alg/time_comp_detector/time_alg/linear_detector.py b/failslow/failslow/alg/time_comp_detector/time_alg/linear_detector.py new file mode 100644 index 0000000000000000000000000000000000000000..6fe1af46b559ec19325dfeba1c4494d945f308f1 --- /dev/null +++ b/failslow/failslow/alg/time_comp_detector/time_alg/linear_detector.py @@ -0,0 +1,106 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:linear_detector.py +Create Date: 2024/8/1 15:06 +Notes: +""" +import numpy as np +from collections import Counter +from sklearn.cluster import DBSCAN +from sklearn.linear_model import LinearRegression +from typing import List + +from failslow.util.logging_utils import get_default_logger + +logger = get_default_logger(__name__) + + +class LinearDetector(): + def __init__(self, cfg=None): + self.detector = None + self.thr = cfg.get("linear_detector_thr", 0.1) + + def min_max_processing(self, ts_data): + if np.min(ts_data) == np.max(ts_data): + ret = ts_data - np.min(ts_data) + return ret + + ret = (ts_data - np.min(ts_data)) / (np.max(ts_data) - np.min(ts_data) + 1e-5) + return ret + + def filter_outlier(self, x: np.ndarray, raw_data: np.ndarray) -> List[np.ndarray]: + eps = 0.01 + sample = 5 + dbscan = DBSCAN(eps=eps, min_samples=sample) + + raw_data = raw_data.reshape(-1, 1) + dbscan.fit(raw_data) + predictions = dbscan.fit_predict(raw_data) + cluster_counts = Counter(predictions) + most_common_cluster = cluster_counts.most_common(1)[0][0] + processed_clusters = np.where(predictions == most_common_cluster, True, False) + + filtered_data = raw_data.flatten()[processed_clusters] + x = x[processed_clusters] + + return [x, filtered_data] + + def fit(self, data: np.ndarray): + pre_data = self.min_max_processing(data) + x = np.arange(pre_data.size) + + x, filtered_data = self.filter_outlier(x, pre_data) + x = self.min_max_processing(x) + filtered_data = self.min_max_processing(filtered_data) + + logger.info("linear input and output:", x.shape, filtered_data.shape) + x = x.reshape(-1, 1) + filtered_data = filtered_data.reshape(-1, 1) + + model = LinearRegression() + model.fit(x, filtered_data) + weight = model.coef_.flatten()[0] + bias = model.intercept_[0] + logger.info(f"linear model y=ax+b, weight: {weight}, bias:{bias}.") + + self.detector = {"weight": weight, "bias": bias} + + def get_anomaly_info(self) -> dict: + anomaly_info = { + "linear_weight": self.detector.get("weight", None), + "linear_detector_threshold": self.thr + } + anomaly_info[ + "rule"] = f"if linear_weight:{anomaly_info.get('linear_weight')} is larger than linear_detector_threshold: {anomaly_info.get('linear_detector_threshold')}, time series is anomaly, else is normal." + return anomaly_info + + def detect(self): + weight = self.detector.get("weight", None) + if weight and weight > self.thr: + logger.info(f"linear weight: {weight}, status is anomaly.") + return False + else: + logger.info(f"linear weight: {weight}, status is normal.") + return True + + +def main(): + data = np.ones(360) * 1650 + data[200:250] -= 800 + data[100:150] += 400 + + data2 = np.arange(360) * 1650 + data2[200:250] -= 800 + + noise = np.random.rand(360) * 180 + data2 = data2 + noise + + detector = LinearDetector(cfg={}) + detector.fit(data) + detector.detect() + + +if __name__ == "__main__": + main()