From 68c319012a029d3c6c1413b513b155651246c6cf Mon Sep 17 00:00:00 2001 From: zhuofeng <1107893276@qq.com> Date: Sun, 26 Jan 2025 03:53:49 +0000 Subject: [PATCH] add avg_block_io and ai_block_io Signed-off-by: zhuofeng <1107893276@qq.com> --- add-avg_block_io-and-ai_block_io.patch | 2962 ++++++++++++++++++++++++ sysSentry.spec | 90 +- 2 files changed, 3051 insertions(+), 1 deletion(-) create mode 100644 add-avg_block_io-and-ai_block_io.patch diff --git a/add-avg_block_io-and-ai_block_io.patch b/add-avg_block_io-and-ai_block_io.patch new file mode 100644 index 0000000..723eaa1 --- /dev/null +++ b/add-avg_block_io-and-ai_block_io.patch @@ -0,0 +1,2962 @@ +From 55f001f2dfcebdabdf82502b91b4c46a77e34f62 Mon Sep 17 00:00:00 2001 +From: zhuofeng +Date: Fri, 24 Jan 2025 11:56:41 +0800 +Subject: [PATCH] add avg_block_io and ai_block_io + +--- + config/plugins/ai_block_io.ini | 39 + + config/plugins/avg_block_io.ini | 40 + + config/tasks/ai_block_io.mod | 7 + + config/tasks/avg_block_io.mod | 7 + + selftest/test/test_ai_block_io.py | 165 ++++ + src/python/sentryPlugins/__init__.py | 0 + .../sentryPlugins/ai_block_io/README.md | 1 + + .../sentryPlugins/ai_block_io/__init__.py | 0 + .../sentryPlugins/ai_block_io/ai_block_io.py | 239 ++++++ + .../sentryPlugins/ai_block_io/alarm_report.py | 80 ++ + .../ai_block_io/config_parser.py | 742 ++++++++++++++++++ + .../sentryPlugins/ai_block_io/data_access.py | 127 +++ + .../sentryPlugins/ai_block_io/detector.py | 156 ++++ + .../sentryPlugins/ai_block_io/io_data.py | 54 ++ + .../ai_block_io/sliding_window.py | 129 +++ + .../sentryPlugins/ai_block_io/threshold.py | 178 +++++ + src/python/sentryPlugins/ai_block_io/utils.py | 73 ++ + .../sentryPlugins/avg_block_io/__init__.py | 0 + .../avg_block_io/avg_block_io.py | 189 +++++ + .../sentryPlugins/avg_block_io/config.py | 208 +++++ + .../sentryPlugins/avg_block_io/module_conn.py | 145 ++++ + .../avg_block_io/stage_window.py | 55 ++ + .../sentryPlugins/avg_block_io/utils.py | 140 ++++ + 24 files changed, 2778 insertions(+), 1 deletion(-) + create mode 100644 config/plugins/ai_block_io.ini + create mode 100644 config/plugins/avg_block_io.ini + create mode 100644 config/tasks/ai_block_io.mod + create mode 100644 config/tasks/avg_block_io.mod + create mode 100644 selftest/test/test_ai_block_io.py + create mode 100644 src/python/sentryPlugins/__init__.py + create mode 100644 src/python/sentryPlugins/ai_block_io/README.md + create mode 100644 src/python/sentryPlugins/ai_block_io/__init__.py + create mode 100644 src/python/sentryPlugins/ai_block_io/ai_block_io.py + create mode 100644 src/python/sentryPlugins/ai_block_io/alarm_report.py + create mode 100644 src/python/sentryPlugins/ai_block_io/config_parser.py + create mode 100644 src/python/sentryPlugins/ai_block_io/data_access.py + create mode 100644 src/python/sentryPlugins/ai_block_io/detector.py + create mode 100644 src/python/sentryPlugins/ai_block_io/io_data.py + create mode 100644 src/python/sentryPlugins/ai_block_io/sliding_window.py + create mode 100644 src/python/sentryPlugins/ai_block_io/threshold.py + create mode 100644 src/python/sentryPlugins/ai_block_io/utils.py + create mode 100644 src/python/sentryPlugins/avg_block_io/__init__.py + create mode 100644 src/python/sentryPlugins/avg_block_io/avg_block_io.py + create mode 100644 src/python/sentryPlugins/avg_block_io/config.py + create mode 100644 src/python/sentryPlugins/avg_block_io/module_conn.py + create mode 100644 src/python/sentryPlugins/avg_block_io/stage_window.py + create mode 100644 src/python/sentryPlugins/avg_block_io/utils.py + +diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini +new file mode 100644 +index 0000000..69f44ba +--- /dev/null ++++ b/config/plugins/ai_block_io.ini +@@ -0,0 +1,39 @@ ++[log] ++level=info ++ ++[common] ++period_time=1 ++disk=default ++stage=default ++iotype=read,write ++ ++[algorithm] ++train_data_duration=24 ++train_update_duration=2 ++algorithm_type=boxplot ++boxplot_parameter=1.5 ++win_type=not_continuous ++win_size=30 ++win_threshold=6 ++ ++[latency_sata_ssd] ++read_avg_lim=10000 ++write_avg_lim=10000 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[latency_nvme_ssd] ++read_avg_lim=10000 ++write_avg_lim=10000 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[latency_sata_hdd] ++read_avg_lim=15000 ++write_avg_lim=15000 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[iodump] ++read_iodump_lim=0 ++write_iodump_lim=0 +\ No newline at end of file +diff --git a/config/plugins/avg_block_io.ini b/config/plugins/avg_block_io.ini +new file mode 100644 +index 0000000..3b4ee33 +--- /dev/null ++++ b/config/plugins/avg_block_io.ini +@@ -0,0 +1,40 @@ ++[log] ++level=info ++ ++[common] ++disk=default ++stage=default ++iotype=read,write ++period_time=1 ++ ++[algorithm] ++win_size=30 ++win_threshold=6 ++ ++[latency_nvme_ssd] ++read_avg_lim=10000 ++write_avg_lim=10000 ++read_avg_time=3 ++write_avg_time=3 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[latency_sata_ssd] ++read_avg_lim=10000 ++write_avg_lim=10000 ++read_avg_time=3 ++write_avg_time=3 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[latency_sata_hdd] ++read_avg_lim=15000 ++write_avg_lim=15000 ++read_avg_time=3 ++write_avg_time=3 ++read_tot_lim=50000 ++write_tot_lim=50000 ++ ++[iodump] ++read_iodump_lim=0 ++write_iodump_lim=0 +diff --git a/config/tasks/ai_block_io.mod b/config/tasks/ai_block_io.mod +new file mode 100644 +index 0000000..82f4f0b +--- /dev/null ++++ b/config/tasks/ai_block_io.mod +@@ -0,0 +1,7 @@ ++[common] ++enabled=yes ++task_start=/usr/bin/python3 /usr/bin/ai_block_io ++task_stop=pkill -f /usr/bin/ai_block_io ++type=oneshot ++alarm_id=1002 ++alarm_clear_time=5 +\ No newline at end of file +diff --git a/config/tasks/avg_block_io.mod b/config/tasks/avg_block_io.mod +new file mode 100644 +index 0000000..bcd063b +--- /dev/null ++++ b/config/tasks/avg_block_io.mod +@@ -0,0 +1,7 @@ ++[common] ++enabled=yes ++task_start=/usr/bin/python3 /usr/bin/avg_block_io ++task_stop=pkill -f /usr/bin/avg_block_io ++type=oneshot ++alarm_id=1002 ++alarm_clear_time=5 +diff --git a/selftest/test/test_ai_block_io.py b/selftest/test/test_ai_block_io.py +new file mode 100644 +index 0000000..c762c82 +--- /dev/null ++++ b/selftest/test/test_ai_block_io.py +@@ -0,0 +1,165 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import unittest ++import numpy as np ++ ++from sentryPlugins.ai_block_io.threshold import AbsoluteThreshold, BoxplotThreshold, NSigmaThreshold ++from sentryPlugins.ai_block_io.sliding_window import (NotContinuousSlidingWindow, ++ ContinuousSlidingWindow, MedianSlidingWindow) ++ ++ ++def _get_boxplot_threshold(data_list: list, parameter): ++ q1 = np.percentile(data_list, 25) ++ q3 = np.percentile(data_list, 75) ++ iqr = q3 - q1 ++ return q3 + parameter * iqr ++ ++ ++def _get_n_sigma_threshold(data_list: list, parameter): ++ mean = np.mean(data_list) ++ std = np.std(data_list) ++ return mean + parameter * std ++ ++ ++class Test(unittest.TestCase): ++ @classmethod ++ def setUpClass(cls): ++ print("UnitTest Begin...") ++ ++ @classmethod ++ def tearDownClass(cls): ++ print("UnitTest End...") ++ ++ def setUp(self): ++ print("Begin...") ++ ++ def tearDown(self): ++ print("End...") ++ ++ def test_absolute_threshold(self): ++ absolute = AbsoluteThreshold() ++ self.assertEqual(None, absolute.get_threshold()) ++ self.assertFalse(absolute.is_abnormal(5000)) ++ absolute.set_threshold(40) ++ self.assertEqual(40, absolute.get_threshold()) ++ self.assertTrue(absolute.is_abnormal(50)) ++ ++ def test_boxplot_threshold(self): ++ boxplot = BoxplotThreshold(1.5, 5, 1) ++ # 阶段1:尚未初始化 ++ self.assertEqual(None, boxplot.get_threshold()) ++ self.assertFalse(boxplot.is_abnormal(5000)) ++ # 往boxplot中插入5个元素后,会生成阈值 ++ data_list = [20, 20, 20, 30, 10] ++ for data in data_list: ++ boxplot.push_latest_data_to_queue(data) ++ # 阶段2:初始化 ++ boxplot_threshold = boxplot.get_threshold() ++ self.assertEqual(_get_boxplot_threshold(data_list, 1.5), boxplot_threshold) ++ self.assertTrue(boxplot.is_abnormal(5000)) ++ data_list.pop(0) ++ data_list.append(100) ++ boxplot.push_latest_data_to_queue(100) ++ # 阶段3:更新阈值 ++ boxplot_threshold = boxplot.get_threshold() ++ self.assertEqual(_get_boxplot_threshold(data_list, 1.5), boxplot_threshold) ++ ++ def test_n_sigma_threshold(self): ++ n_sigma = NSigmaThreshold(3, 5, 1) ++ self.assertEqual(None, n_sigma.get_threshold()) ++ self.assertFalse(n_sigma.is_abnormal(5000)) ++ data_list = [20, 20, 20, 30, 10] ++ for data in data_list: ++ n_sigma.push_latest_data_to_queue(data) ++ n_sigma_threshold = n_sigma.get_threshold() ++ self.assertEqual(_get_n_sigma_threshold(data_list, 3), n_sigma_threshold) ++ self.assertTrue(n_sigma.is_abnormal(5000)) ++ data_list.pop(0) ++ data_list.append(100) ++ n_sigma.push_latest_data_to_queue(100) ++ # 阶段3:更新阈值 ++ n_sigma_threshold = n_sigma.get_threshold() ++ self.assertEqual(_get_n_sigma_threshold(data_list, 3), n_sigma_threshold) ++ ++ def test_not_continuous_sliding_window(self): ++ not_continuous = NotContinuousSlidingWindow(5, 3) ++ boxplot_threshold = BoxplotThreshold(1.5, 10, 8) ++ boxplot_threshold.attach_observer(not_continuous) ++ data_list1 = [19, 20, 20, 20, 20, 20, 22, 24, 23, 20] ++ for data in data_list1: ++ boxplot_threshold.push_latest_data_to_queue(data) ++ result = not_continuous.is_slow_io_event(data) ++ self.assertFalse(result[0][0]) ++ self.assertEqual(23.75, boxplot_threshold.get_threshold()) ++ boxplot_threshold.push_latest_data_to_queue(24) ++ result = not_continuous.is_slow_io_event(24) ++ self.assertFalse(result[0][0]) ++ boxplot_threshold.push_latest_data_to_queue(25) ++ result = not_continuous.is_slow_io_event(25) ++ self.assertTrue(result[0]) ++ data_list2 = [20, 20, 20, 20, 20, 20] ++ for data in data_list2: ++ boxplot_threshold.push_latest_data_to_queue(data) ++ result = not_continuous.is_slow_io_event(data) ++ self.assertFalse(result[0][0]) ++ self.assertEqual(25.625, boxplot_threshold.get_threshold()) ++ ++ def test_continuous_sliding_window(self): ++ continuous = ContinuousSlidingWindow(5, 3) ++ boxplot_threshold = BoxplotThreshold(1.5, 10, 8) ++ boxplot_threshold.attach_observer(continuous) ++ data_list = [19, 20, 20, 20, 20, 20, 22, 24, 23, 20] ++ for data in data_list: ++ boxplot_threshold.push_latest_data_to_queue(data) ++ result = continuous.is_slow_io_event(data) ++ self.assertFalse(result[0][0]) ++ self.assertEqual(23.75, boxplot_threshold.get_threshold()) ++ # 没有三个异常点 ++ self.assertFalse(continuous.is_slow_io_event(25)[0][0]) ++ # 不连续的三个异常点 ++ self.assertFalse(continuous.is_slow_io_event(25)[0][0]) ++ # 连续的三个异常点 ++ self.assertTrue(continuous.is_slow_io_event(25)[0][0]) ++ ++ def test_median_sliding_window(self): ++ median = MedianSlidingWindow(5, 3) ++ absolute_threshold = AbsoluteThreshold(10, 8) ++ absolute_threshold.attach_observer(median) ++ absolute_threshold.set_threshold(24.5) ++ data_list = [24, 24, 24, 25, 25] ++ for data in data_list: ++ self.assertFalse(median.is_slow_io_event(data)[0][0]) ++ self.assertTrue(median.is_slow_io_event(25)[0]) ++ ++ def test_parse_collect_data(self): ++ collect = { ++ "read": [1.0, 2.0, 3.0, 4.0], ++ "write": [5.0, 6.0, 7.0, 8.0], ++ "flush": [9.0, 10.0, 11.0, 12.0], ++ "discard": [13.0, 14.0, 15.0, 16.0], ++ } ++ from sentryPlugins.ai_block_io.io_data import BaseData ++ from sentryPlugins.ai_block_io.data_access import _get_io_stage_data ++ ++ io_data = _get_io_stage_data(collect) ++ self.assertEqual( ++ io_data.read, BaseData(latency=1.0, io_dump=2.0, io_length=3.0, iops=4.0) ++ ) ++ self.assertEqual( ++ io_data.write, BaseData(latency=5.0, io_dump=6.0, io_length=7.0, iops=8.0) ++ ) ++ self.assertEqual( ++ io_data.flush, BaseData(latency=9.0, io_dump=10.0, io_length=11.0, iops=12.0) ++ ) ++ self.assertEqual( ++ io_data.discard, BaseData(latency=13.0, io_dump=14.0, io_length=15.0, iops=16.0) ++ ) +diff --git a/src/python/sentryPlugins/__init__.py b/src/python/sentryPlugins/__init__.py +new file mode 100644 +index 0000000..e69de29 +diff --git a/src/python/sentryPlugins/ai_block_io/README.md b/src/python/sentryPlugins/ai_block_io/README.md +new file mode 100644 +index 0000000..95c1111 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/README.md +@@ -0,0 +1 @@ ++# slow_io_detection +diff --git a/src/python/sentryPlugins/ai_block_io/__init__.py b/src/python/sentryPlugins/ai_block_io/__init__.py +new file mode 100644 +index 0000000..e69de29 +diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py +new file mode 100644 +index 0000000..8075f5f +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py +@@ -0,0 +1,239 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import time ++import signal ++import logging ++from collections import defaultdict ++ ++from .detector import Detector, DiskDetector ++from .threshold import ThresholdFactory, ThresholdType ++from .sliding_window import SlidingWindowFactory ++from .utils import get_data_queue_size_and_update_size ++from .config_parser import ConfigParser ++from .data_access import ( ++ get_io_data_from_collect_plug, ++ check_collect_valid, ++ get_disk_type, ++ check_disk_is_available ++) ++from .io_data import MetricName ++from .alarm_report import Xalarm, Report ++ ++CONFIG_FILE = "/etc/sysSentry/plugins/ai_block_io.ini" ++ ++ ++def sig_handler(signum, frame): ++ Report.report_pass(f"receive signal: {signum}, exiting...") ++ logging.info("Finished ai_block_io plugin running.") ++ exit(signum) ++ ++ ++class SlowIODetection: ++ _config_parser = None ++ _disk_list = [] ++ _detector_name_list = defaultdict(list) ++ _disk_detectors = {} ++ ++ def __init__(self, config_parser: ConfigParser): ++ self._config_parser = config_parser ++ self.__init_detector_name_list() ++ self.__init_detector() ++ ++ def __init_detector_name_list(self): ++ disks: list = self._config_parser.disks_to_detection ++ stages: list = self._config_parser.stage ++ iotypes: list = self._config_parser.iotype ++ ++ if disks is None: ++ logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") ++ all_available_disk_list = check_collect_valid(self._config_parser.period_time) ++ if all_available_disk_list is None: ++ Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") ++ logging.critical("get available disk error, please check if the collector plug is enable. exiting...") ++ exit(1) ++ if len(all_available_disk_list) == 0: ++ Report.report_pass("not found available disk. exiting...") ++ logging.critical("not found available disk. exiting...") ++ exit(1) ++ disks = all_available_disk_list ++ logging.info(f"available disk list is follow: {disks}.") ++ ++ for disk in disks: ++ tmp_disk = [disk] ++ ret = check_disk_is_available(self._config_parser.period_time, tmp_disk) ++ if not ret: ++ logging.warning(f"disk: {disk} is not available, it will be ignored.") ++ continue ++ ++ disk_type_result = get_disk_type(disk) ++ if disk_type_result["ret"] == 0 and disk_type_result["message"] in ( ++ '0', ++ '1', ++ '2', ++ ): ++ disk_type = int(disk_type_result["message"]) ++ else: ++ logging.warning( ++ "%s get disk type error, return %s, so it will be ignored.", ++ disk, ++ disk_type_result, ++ ) ++ continue ++ self._disk_list.append(disk) ++ for stage in stages: ++ for iotype in iotypes: ++ self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "latency")) ++ self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) ++ ++ if not self._detector_name_list: ++ Report.report_pass("the disks to detection is empty, ai_block_io will exit.") ++ logging.critical("the disks to detection is empty, ai_block_io will exit.") ++ exit(1) ++ ++ def __init_detector(self): ++ train_data_duration, train_update_duration = ( ++ self._config_parser.get_train_data_duration_and_train_update_duration() ++ ) ++ slow_io_detection_frequency = self._config_parser.period_time ++ threshold_type = self._config_parser.algorithm_type ++ data_queue_size, update_size = get_data_queue_size_and_update_size( ++ train_data_duration, train_update_duration, slow_io_detection_frequency ++ ) ++ sliding_window_type = self._config_parser.sliding_window_type ++ window_size, window_threshold = ( ++ self._config_parser.get_window_size_and_window_minimum_threshold() ++ ) ++ ++ for disk, metric_name_list in self._detector_name_list.items(): ++ disk_detector = DiskDetector(disk) ++ for metric_name in metric_name_list: ++ ++ if metric_name.metric_name == 'latency': ++ threshold = ThresholdFactory().get_threshold( ++ threshold_type, ++ boxplot_parameter=self._config_parser.boxplot_parameter, ++ n_sigma_paramter=self._config_parser.n_sigma_parameter, ++ data_queue_size=data_queue_size, ++ data_queue_update_size=update_size, ++ ) ++ tot_lim = self._config_parser.get_tot_lim( ++ metric_name.disk_type, metric_name.io_access_type_name ++ ) ++ avg_lim = self._config_parser.get_avg_lim( ++ metric_name.disk_type, metric_name.io_access_type_name ++ ) ++ if tot_lim is None: ++ logging.warning( ++ "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.", ++ disk, ++ metric_name.disk_type, ++ metric_name.io_access_type_name, ++ ) ++ sliding_window = SlidingWindowFactory().get_sliding_window( ++ sliding_window_type, ++ queue_length=window_size, ++ threshold=window_threshold, ++ abs_threshold=tot_lim, ++ avg_lim=avg_lim ++ ) ++ detector = Detector(metric_name, threshold, sliding_window) ++ disk_detector.add_detector(detector) ++ continue ++ ++ elif metric_name.metric_name == 'io_dump': ++ threshold = ThresholdFactory().get_threshold(ThresholdType.AbsoluteThreshold) ++ abs_threshold = None ++ if metric_name.io_access_type_name == 'read': ++ abs_threshold = self._config_parser.read_iodump_lim ++ elif metric_name.io_access_type_name == 'write': ++ abs_threshold = self._config_parser.write_iodump_lim ++ sliding_window = SlidingWindowFactory().get_sliding_window( ++ sliding_window_type, ++ queue_length=window_size, ++ threshold=window_threshold ++ ) ++ detector = Detector(metric_name, threshold, sliding_window) ++ threshold.set_threshold(abs_threshold) ++ disk_detector.add_detector(detector) ++ ++ logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") ++ self._disk_detectors[disk] = disk_detector ++ ++ def launch(self): ++ while True: ++ logging.debug("step0. AI threshold slow io event detection is looping.") ++ ++ # Step1:获取IO数据 ++ io_data_dict_with_disk_name = get_io_data_from_collect_plug( ++ self._config_parser.period_time, self._disk_list ++ ) ++ logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") ++ if io_data_dict_with_disk_name is None: ++ Report.report_pass( ++ "get io data error, please check if the collector plug is enable. exitting..." ++ ) ++ exit(1) ++ ++ # Step2:慢IO检测 ++ logging.debug("step2. Start to detection slow io event.") ++ slow_io_event_list = [] ++ for disk, disk_detector in self._disk_detectors.items(): ++ result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) ++ if result[0]: ++ slow_io_event_list.append(result) ++ logging.debug("step2. End to detection slow io event.") ++ ++ # Step3:慢IO事件上报 ++ logging.debug("step3. Report slow io event to sysSentry.") ++ for slow_io_event in slow_io_event_list: ++ alarm_content = { ++ "alarm_source": "ai_block_io", ++ "driver_name": slow_io_event[1], ++ "io_type": slow_io_event[4], ++ "reason": slow_io_event[2], ++ "block_stack": slow_io_event[3], ++ "alarm_type": slow_io_event[5], ++ "details": slow_io_event[6] ++ } ++ Xalarm.major(alarm_content) ++ tmp_alarm_content = alarm_content.copy() ++ del tmp_alarm_content["details"] ++ logging.warning("[SLOW IO] " + str(tmp_alarm_content)) ++ logging.warning(f'[SLOW IO] disk: {str(tmp_alarm_content.get("driver_name"))}, ' ++ f'stage: {str(tmp_alarm_content.get("driver_name"))}, ' ++ f'iotype: {str(tmp_alarm_content.get("io_type"))}, ' ++ f'type: {str(tmp_alarm_content.get("alarm_type"))}, ' ++ f'reason: {str(tmp_alarm_content.get("reason"))}') ++ logging.warning(f"latency: " + str(alarm_content.get("details").get("latency"))) ++ logging.warning(f"iodump: " + str(alarm_content.get("details").get("iodump"))) ++ ++ # Step4:等待检测时间 ++ logging.debug("step4. Wait to start next slow io event detection loop.") ++ time.sleep(self._config_parser.period_time) ++ ++ ++def main(): ++ # Step1:注册消息处理函数 ++ signal.signal(signal.SIGINT, sig_handler) ++ signal.signal(signal.SIGTERM, sig_handler) ++ ++ # Step2:断点恢复 ++ # todo: ++ ++ # Step3:读取配置 ++ config_file_name = CONFIG_FILE ++ config = ConfigParser(config_file_name) ++ config.read_config_from_file() ++ ++ # Step4:启动慢IO检测 ++ slow_io_detection = SlowIODetection(config) ++ slow_io_detection.launch() +diff --git a/src/python/sentryPlugins/ai_block_io/alarm_report.py b/src/python/sentryPlugins/ai_block_io/alarm_report.py +new file mode 100644 +index 0000000..61bb145 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/alarm_report.py +@@ -0,0 +1,80 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import logging ++import json ++ ++from xalarm.sentry_notify import ( ++ xalarm_report, ++ MINOR_ALM, ++ MAJOR_ALM, ++ CRITICAL_ALM, ++ ALARM_TYPE_OCCUR, ++ ALARM_TYPE_RECOVER, ++) ++ ++from syssentry.result import ResultLevel, report_result ++ ++ ++class Report: ++ TASK_NAME = "ai_block_io" ++ ++ @staticmethod ++ def report_pass(info: str): ++ report_result(Report.TASK_NAME, ResultLevel.PASS, json.dumps({"msg": info})) ++ logging.debug(f'Report {Report.TASK_NAME} PASS: {info}') ++ ++ @staticmethod ++ def report_fail(info: str): ++ report_result(Report.TASK_NAME, ResultLevel.FAIL, json.dumps({"msg": info})) ++ logging.debug(f'Report {Report.TASK_NAME} FAIL: {info}') ++ ++ @staticmethod ++ def report_skip(info: str): ++ report_result(Report.TASK_NAME, ResultLevel.SKIP, json.dumps({"msg": info})) ++ logging.debug(f'Report {Report.TASK_NAME} SKIP: {info}') ++ ++ ++class Xalarm: ++ ALARM_ID = 1002 ++ ++ @staticmethod ++ def minor(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, MINOR_ALM, ALARM_TYPE_OCCUR, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} MINOR_ALM: {info_str}") ++ ++ @staticmethod ++ def major(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, MAJOR_ALM, ALARM_TYPE_OCCUR, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} MAJOR_ALM: {info_str}") ++ ++ @staticmethod ++ def critical(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, CRITICAL_ALM, ALARM_TYPE_OCCUR, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} CRITICAL_ALM: {info_str}") ++ ++ def minor_recover(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, MINOR_ALM, ALARM_TYPE_RECOVER, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} MINOR_ALM Recover: {info_str}") ++ ++ def major_recover(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, MAJOR_ALM, ALARM_TYPE_RECOVER, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} MAJOR_ALM Recover: {info_str}") ++ ++ def critical_recover(info: dict): ++ info_str = json.dumps(info) ++ xalarm_report(Xalarm.ALARM_ID, CRITICAL_ALM, ALARM_TYPE_RECOVER, info_str) ++ logging.debug(f"Report {Xalarm.ALARM_ID} CRITICAL_ALM Recover: {info_str}") +diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py +new file mode 100644 +index 0000000..1bbb609 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/config_parser.py +@@ -0,0 +1,742 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import os ++import configparser ++import logging ++ ++from .alarm_report import Report ++from .threshold import ThresholdType ++from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level ++from .data_access import check_detect_frequency_is_valid ++ ++ ++LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" ++ ++ALL_STAGE_LIST = [ ++ "throtl", ++ "wbt", ++ "gettag", ++ "plug", ++ "deadline", ++ "hctx", ++ "requeue", ++ "rq_driver", ++ "bio", ++] ++ALL_IOTPYE_LIST = ["read", "write"] ++DISK_TYPE_MAP = { ++ 0: "nvme_ssd", ++ 1: "sata_ssd", ++ 2: "sata_hdd", ++} ++ ++ ++def init_log_format(log_level: str): ++ logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT) ++ if log_level.lower() not in ("info", "warning", "error", "debug"): ++ logging.warning( ++ "the log_level: %s you set is invalid, use default value: info.", log_level ++ ) ++ ++ ++class ConfigParser: ++ DEFAULT_CONF = { ++ "log": {"level": "info"}, ++ "common": { ++ "period_time": 1, ++ "disk": None, ++ "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio", ++ "iotype": "read,write", ++ }, ++ "algorithm": { ++ "train_data_duration": 24.0, ++ "train_update_duration": 2.0, ++ "algorithm_type": get_threshold_type_enum("boxplot"), ++ "boxplot_parameter": 1.5, ++ "n_sigma_parameter": 3.0, ++ "win_type": get_sliding_window_type_enum("not_continuous"), ++ "win_size": 30, ++ "win_threshold": 6, ++ }, ++ "latency_sata_ssd": { ++ "read_avg_lim": 10000, ++ "write_avg_lim": 10000, ++ "read_tot_lim": 50000, ++ "write_tot_lim": 50000 ++ }, ++ "latency_nvme_ssd": { ++ "read_avg_lim": 10000, ++ "write_avg_lim": 10000, ++ "read_tot_lim": 50000, ++ "write_tot_lim": 50000 ++ }, ++ "latency_sata_hdd": { ++ "read_avg_lim": 15000, ++ "write_avg_lim": 15000, ++ "read_tot_lim": 50000, ++ "write_tot_lim": 50000 ++ }, ++ "iodump": { ++ "read_iodump_lim": 0, ++ "write_iodump_lim": 0 ++ } ++ } ++ ++ def __init__(self, config_file_name): ++ self._conf = ConfigParser.DEFAULT_CONF ++ self._config_file_name = config_file_name ++ ++ def _get_config_value( ++ self, ++ config_items: dict, ++ key: str, ++ value_type, ++ default_value=None, ++ gt=None, ++ ge=None, ++ lt=None, ++ le=None, ++ section=None ++ ): ++ if section is not None: ++ print_key = section + "." + key ++ else: ++ print_key = key ++ value = config_items.get(key) ++ if value is None: ++ logging.warning( ++ "config of %s not found, the default value %s will be used.", ++ print_key, ++ default_value, ++ ) ++ value = default_value ++ if not value: ++ logging.critical( ++ "the value of %s is empty, ai_block_io plug will exit.", print_key ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is empty, ai_block_io plug will exit." ++ ) ++ exit(1) ++ try: ++ value = value_type(value) ++ except ValueError: ++ logging.critical( ++ "the value of %s is not a valid %s, ai_block_io plug will exit.", ++ print_key, ++ value_type, ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is not a valid {value_type}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if gt is not None and value <= gt: ++ logging.critical( ++ "the value of %s is not greater than %s, ai_block_io plug will exit.", ++ print_key, ++ gt, ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is not greater than {gt}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if ge is not None and value < ge: ++ logging.critical( ++ "the value of %s is not greater than or equal to %s, ai_block_io plug will exit.", ++ print_key, ++ ge, ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is not greater than or equal to {ge}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if lt is not None and value >= lt: ++ logging.critical( ++ "the value of %s is not less than %s, ai_block_io plug will exit.", ++ print_key, ++ lt, ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is not less than {lt}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ if le is not None and value > le: ++ logging.critical( ++ "the value of %s is not less than or equal to %s, ai_block_io plug will exit.", ++ print_key, ++ le, ++ ) ++ Report.report_pass( ++ f"the value of {print_key} is not less than or equal to {le}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ return value ++ ++ def _read_period_time(self, items_common: dict): ++ self._conf["common"]["period_time"] = self._get_config_value( ++ items_common, ++ "period_time", ++ int, ++ self.DEFAULT_CONF["common"]["period_time"], ++ gt=0 ++ ) ++ frequency = self._conf["common"]["period_time"] ++ ret = check_detect_frequency_is_valid(frequency) ++ if ret is None: ++ log = f"period_time: {frequency} is invalid, "\ ++ f"Check whether the value range is too large or is not an "\ ++ f"integer multiple of period_time.. exiting..." ++ Report.report_pass(log) ++ logging.critical(log) ++ exit(1) ++ ++ def _read_disks_to_detect(self, items_common: dict): ++ disks_to_detection = items_common.get("disk") ++ if disks_to_detection is None: ++ logging.warning("config of disk not found, the default value will be used.") ++ self._conf["common"]["disk"] = None ++ return ++ disks_to_detection = disks_to_detection.strip() ++ disks_to_detection = disks_to_detection.lower() ++ if not disks_to_detection: ++ logging.critical("the value of disk is empty, ai_block_io plug will exit.") ++ Report.report_pass( ++ "the value of disk is empty, ai_block_io plug will exit." ++ ) ++ exit(1) ++ disk_list = disks_to_detection.split(",") ++ disk_list = [disk.strip() for disk in disk_list] ++ if len(disk_list) == 1 and disk_list[0] == "default": ++ self._conf["common"]["disk"] = None ++ return ++ if len(disk_list) > 10: ++ ten_disk_list = disk_list[0:10] ++ other_disk_list = disk_list[10:] ++ logging.warning(f"disk only support maximum is 10, disks: {ten_disk_list} will be retained, other: {other_disk_list} will be ignored.") ++ else: ++ ten_disk_list = disk_list ++ set_ten_disk_list = set(ten_disk_list) ++ if len(ten_disk_list) > len(set_ten_disk_list): ++ tmp = ten_disk_list ++ ten_disk_list = list(set_ten_disk_list) ++ logging.warning(f"disk exist duplicate, it will be deduplicate, before: {tmp}, after: {ten_disk_list}") ++ self._conf["common"]["disk"] = ten_disk_list ++ ++ def _read_train_data_duration(self, items_algorithm: dict): ++ self._conf["algorithm"]["train_data_duration"] = self._get_config_value( ++ items_algorithm, ++ "train_data_duration", ++ float, ++ self.DEFAULT_CONF["algorithm"]["train_data_duration"], ++ gt=0, ++ le=720, ++ ) ++ ++ def _read_train_update_duration(self, items_algorithm: dict): ++ default_train_update_duration = self.DEFAULT_CONF["algorithm"][ ++ "train_update_duration" ++ ] ++ if default_train_update_duration > self._conf["algorithm"]["train_data_duration"]: ++ default_train_update_duration = ( ++ self._conf["algorithm"]["train_data_duration"] / 2 ++ ) ++ self._conf["algorithm"]["train_update_duration"] = self._get_config_value( ++ items_algorithm, ++ "train_update_duration", ++ float, ++ default_train_update_duration, ++ gt=0, ++ le=self._conf["algorithm"]["train_data_duration"], ++ ) ++ ++ def _read_algorithm_type_and_parameter(self, items_algorithm: dict): ++ algorithm_type = items_algorithm.get("algorithm_type") ++ if algorithm_type is None: ++ default_algorithm_type = self._conf["algorithm"]["algorithm_type"] ++ logging.warning(f"algorithm_type not found, it will be set default: {default_algorithm_type}") ++ else: ++ self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(algorithm_type) ++ ++ if self._conf["algorithm"]["algorithm_type"] is None: ++ logging.critical( ++ "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.", ++ algorithm_type, ++ ) ++ Report.report_pass( ++ f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold: ++ self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value( ++ items_algorithm, ++ "n_sigma_parameter", ++ float, ++ self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"], ++ gt=0, ++ le=10, ++ ) ++ elif ( ++ self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold ++ ): ++ self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value( ++ items_algorithm, ++ "boxplot_parameter", ++ float, ++ self.DEFAULT_CONF["algorithm"]["boxplot_parameter"], ++ gt=0, ++ le=10, ++ ) ++ ++ def _read_stage(self, items_algorithm: dict): ++ stage_str = items_algorithm.get("stage") ++ if stage_str is None: ++ stage_str = self.DEFAULT_CONF["common"]["stage"] ++ logging.warning(f"stage not found, it will be set default: {stage_str}") ++ else: ++ stage_str = stage_str.strip() ++ ++ stage_str = stage_str.lower() ++ stage_list = stage_str.split(",") ++ stage_list = [stage.strip() for stage in stage_list] ++ if len(stage_list) == 1 and stage_list[0] == "": ++ logging.critical("stage value not allow is empty, exiting...") ++ exit(1) ++ if len(stage_list) == 1 and stage_list[0] == "default": ++ logging.warning( ++ "stage will enable default value: %s", ++ self.DEFAULT_CONF["common"]["stage"], ++ ) ++ self._conf["common"]["stage"] = ALL_STAGE_LIST ++ return ++ for stage in stage_list: ++ if stage not in ALL_STAGE_LIST: ++ logging.critical( ++ "stage: %s is not valid stage, ai_block_io will exit...", stage ++ ) ++ exit(1) ++ dup_stage_list = set(stage_list) ++ if "bio" not in dup_stage_list: ++ logging.critical("stage must contains bio stage, exiting...") ++ exit(1) ++ self._conf["common"]["stage"] = dup_stage_list ++ ++ def _read_iotype(self, items_algorithm: dict): ++ iotype_str = items_algorithm.get("iotype") ++ if iotype_str is None: ++ iotype_str = self.DEFAULT_CONF["common"]["iotype"] ++ logging.warning(f"iotype not found, it will be set default: {iotype_str}") ++ else: ++ iotype_str = iotype_str.strip() ++ ++ iotype_str = iotype_str.lower() ++ iotype_list = iotype_str.split(",") ++ iotype_list = [iotype.strip() for iotype in iotype_list] ++ if len(iotype_list) == 1 and iotype_list[0] == "": ++ logging.critical("iotype value not allow is empty, exiting...") ++ exit(1) ++ if len(iotype_list) == 1 and iotype_list[0] == "default": ++ logging.warning( ++ "iotype will enable default value: %s", ++ self.DEFAULT_CONF["common"]["iotype"], ++ ) ++ self._conf["common"]["iotype"] = ALL_IOTPYE_LIST ++ return ++ for iotype in iotype_list: ++ if iotype not in ALL_IOTPYE_LIST: ++ logging.critical( ++ "iotype: %s is not valid iotype, ai_block_io will exit...", iotype ++ ) ++ exit(1) ++ dup_iotype_list = set(iotype_list) ++ self._conf["common"]["iotype"] = dup_iotype_list ++ ++ def _read_sliding_window_type(self, items_sliding_window: dict): ++ sliding_window_type = items_sliding_window.get("win_type") ++ ++ if sliding_window_type is None: ++ default_sliding_window_type = self._conf["algorithm"]["win_type"] ++ logging.warning(f"win_type not found, it will be set default: {default_sliding_window_type}") ++ return ++ ++ sliding_window_type = sliding_window_type.strip() ++ if sliding_window_type is not None: ++ self._conf["algorithm"]["win_type"] = ( ++ get_sliding_window_type_enum(sliding_window_type) ++ ) ++ if self._conf["algorithm"]["win_type"] is None: ++ logging.critical( ++ "the win_type: %s you set is invalid. ai_block_io plug will exit.", ++ sliding_window_type, ++ ) ++ Report.report_pass( ++ f"the win_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ def _read_window_size(self, items_sliding_window: dict): ++ self._conf["algorithm"]["win_size"] = self._get_config_value( ++ items_sliding_window, ++ "win_size", ++ int, ++ self.DEFAULT_CONF["algorithm"]["win_size"], ++ gt=0, ++ le=300, ++ ) ++ ++ def _read_window_minimum_threshold(self, items_sliding_window: dict): ++ default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold"] ++ self._conf["algorithm"]["win_threshold"] = ( ++ self._get_config_value( ++ items_sliding_window, ++ "win_threshold", ++ int, ++ default_window_minimum_threshold, ++ gt=0, ++ le=self._conf["algorithm"]["win_size"], ++ ) ++ ) ++ ++ def read_config_from_file(self): ++ if not os.path.exists(self._config_file_name): ++ init_log_format(self._conf["log"]["level"]) ++ logging.critical( ++ "config file %s not found, ai_block_io plug will exit.", ++ self._config_file_name, ++ ) ++ Report.report_pass( ++ f"config file {self._config_file_name} not found, ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ con = configparser.ConfigParser() ++ try: ++ con.read(self._config_file_name, encoding="utf-8") ++ except configparser.Error as e: ++ init_log_format(self._conf["log"]["level"]) ++ logging.critical( ++ "config file read error: %s, ai_block_io plug will exit.", e ++ ) ++ Report.report_pass( ++ f"config file read error: {e}, ai_block_io plug will exit." ++ ) ++ exit(1) ++ ++ if con.has_section("log"): ++ items_log = dict(con.items("log")) ++ # 情况一:没有log,则使用默认值 ++ # 情况二:有log,值为空或异常,使用默认值 ++ # 情况三:有log,值正常,则使用该值 ++ self._conf["log"]["level"] = items_log.get( ++ "level", self.DEFAULT_CONF["log"]["level"] ++ ) ++ init_log_format(self._conf["log"]["level"]) ++ else: ++ init_log_format(self._conf["log"]["level"]) ++ logging.warning( ++ "log section parameter not found, it will be set to default value." ++ ) ++ ++ if con.has_section("common"): ++ items_common = dict(con.items("common")) ++ ++ self._read_period_time(items_common) ++ self._read_disks_to_detect(items_common) ++ self._read_stage(items_common) ++ self._read_iotype(items_common) ++ else: ++ Report.report_pass("not found common section. exiting...") ++ logging.critical("not found common section. exiting...") ++ exit(1) ++ ++ if con.has_section("algorithm"): ++ items_algorithm = dict(con.items("algorithm")) ++ self._read_train_data_duration(items_algorithm) ++ self._read_train_update_duration(items_algorithm) ++ self._read_algorithm_type_and_parameter(items_algorithm) ++ self._read_sliding_window_type(items_algorithm) ++ self._read_window_size(items_algorithm) ++ self._read_window_minimum_threshold(items_algorithm) ++ ++ if con.has_section("latency_sata_ssd"): ++ items_latency_sata_ssd = dict(con.items("latency_sata_ssd")) ++ self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value( ++ items_latency_sata_ssd, ++ "read_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"], ++ gt=0, ++ section="latency_sata_ssd" ++ ) ++ self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value( ++ items_latency_sata_ssd, ++ "write_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"], ++ gt=0, ++ section="latency_sata_ssd" ++ ) ++ self._conf["latency_sata_ssd"]["read_avg_lim"] = self._get_config_value( ++ items_latency_sata_ssd, ++ "read_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_ssd"]["read_avg_lim"], ++ gt=0, ++ section="latency_sata_ssd" ++ ) ++ self._conf["latency_sata_ssd"]["write_avg_lim"] = self._get_config_value( ++ items_latency_sata_ssd, ++ "write_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_ssd"]["write_avg_lim"], ++ gt=0, ++ section="latency_sata_ssd" ++ ) ++ if self._conf["latency_sata_ssd"]["read_avg_lim"] >= self._conf["latency_sata_ssd"]["read_tot_lim"]: ++ Report.report_pass("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") ++ logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") ++ exit(1) ++ if self._conf["latency_sata_ssd"]["write_avg_lim"] >= self._conf["latency_sata_ssd"]["write_tot_lim"]: ++ Report.report_pass("latency_sata_ssd.write_avg_lim must < latency_sata_ssd.write_tot_lim . exiting...") ++ logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") ++ exit(1) ++ else: ++ Report.report_pass("not found latency_sata_ssd section. exiting...") ++ logging.critical("not found latency_sata_ssd section. exiting...") ++ exit(1) ++ ++ if con.has_section("latency_nvme_ssd"): ++ items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd")) ++ self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value( ++ items_latency_nvme_ssd, ++ "read_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"], ++ gt=0, ++ section="latency_nvme_ssd" ++ ) ++ self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value( ++ items_latency_nvme_ssd, ++ "write_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"], ++ gt=0, ++ section="latency_nvme_ssd" ++ ) ++ self._conf["latency_nvme_ssd"]["read_avg_lim"] = self._get_config_value( ++ items_latency_nvme_ssd, ++ "read_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_nvme_ssd"]["read_avg_lim"], ++ gt=0, ++ section="latency_nvme_ssd" ++ ) ++ self._conf["latency_nvme_ssd"]["write_avg_lim"] = self._get_config_value( ++ items_latency_nvme_ssd, ++ "write_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_nvme_ssd"]["write_avg_lim"], ++ gt=0, ++ section="latency_nvme_ssd" ++ ) ++ if self._conf["latency_nvme_ssd"]["read_avg_lim"] >= self._conf["latency_nvme_ssd"]["read_tot_lim"]: ++ Report.report_pass("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") ++ logging.critical("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") ++ exit(1) ++ if self._conf["latency_nvme_ssd"]["write_avg_lim"] >= self._conf["latency_nvme_ssd"]["write_tot_lim"]: ++ Report.report_pass("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") ++ logging.critical("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") ++ exit(1) ++ else: ++ Report.report_pass("not found latency_nvme_ssd section. exiting...") ++ logging.critical("not found latency_nvme_ssd section. exiting...") ++ exit(1) ++ ++ if con.has_section("latency_sata_hdd"): ++ items_latency_sata_hdd = dict(con.items("latency_sata_hdd")) ++ self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value( ++ items_latency_sata_hdd, ++ "read_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"], ++ gt=0, ++ section="latency_sata_hdd" ++ ) ++ self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value( ++ items_latency_sata_hdd, ++ "write_tot_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"], ++ gt=0, ++ section="latency_sata_hdd" ++ ) ++ self._conf["latency_sata_hdd"]["read_avg_lim"] = self._get_config_value( ++ items_latency_sata_hdd, ++ "read_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_hdd"]["read_avg_lim"], ++ gt=0, ++ section="latency_sata_hdd" ++ ) ++ self._conf["latency_sata_hdd"]["write_avg_lim"] = self._get_config_value( ++ items_latency_sata_hdd, ++ "write_avg_lim", ++ int, ++ self.DEFAULT_CONF["latency_sata_hdd"]["write_avg_lim"], ++ gt=0, ++ section="latency_sata_hdd" ++ ) ++ if self._conf["latency_sata_hdd"]["read_avg_lim"] >= self._conf["latency_sata_hdd"]["read_tot_lim"]: ++ Report.report_pass("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") ++ logging.critical("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") ++ exit(1) ++ if self._conf["latency_sata_hdd"]["write_avg_lim"] >= self._conf["latency_sata_hdd"]["write_tot_lim"]: ++ Report.report_pass("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") ++ logging.critical("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") ++ exit(1) ++ else: ++ Report.report_pass("not found latency_sata_hdd section. exiting...") ++ logging.critical("not found latency_sata_hdd section. exiting...") ++ exit(1) ++ ++ if con.has_section("iodump"): ++ items_iodump = dict(con.items("iodump")) ++ self._conf["iodump"]["read_iodump_lim"] = self._get_config_value( ++ items_iodump, ++ "read_iodump_lim", ++ int, ++ self.DEFAULT_CONF["iodump"]["read_iodump_lim"], ++ ge=0 ++ ) ++ self._conf["iodump"]["write_iodump_lim"] = self._get_config_value( ++ items_iodump, ++ "write_iodump_lim", ++ int, ++ self.DEFAULT_CONF["iodump"]["write_iodump_lim"], ++ ge=0 ++ ) ++ else: ++ Report.report_pass("not found iodump section. exiting...") ++ logging.critical("not found iodump section. exiting...") ++ exit(1) ++ ++ self.__print_all_config_value() ++ ++ def __repr__(self) -> str: ++ return str(self._conf) ++ ++ def __str__(self) -> str: ++ return str(self._conf) ++ ++ def __print_all_config_value(self): ++ logging.info("all config is follow:\n %s", self) ++ ++ def get_tot_lim(self, disk_type, io_type): ++ if io_type == "read": ++ return self._conf.get( ++ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} ++ ).get("read_tot_lim", None) ++ elif io_type == "write": ++ return self._conf.get( ++ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} ++ ).get("write_tot_lim", None) ++ else: ++ return None ++ ++ def get_avg_lim(self, disk_type, io_type): ++ if io_type == "read": ++ return self._conf.get( ++ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} ++ ).get("read_avg_lim", None) ++ elif io_type == "write": ++ return self._conf.get( ++ f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} ++ ).get("write_avg_lim", None) ++ else: ++ return None ++ ++ def get_train_data_duration_and_train_update_duration(self): ++ return ( ++ self._conf["algorithm"]["train_data_duration"], ++ self._conf["algorithm"]["train_update_duration"], ++ ) ++ ++ def get_window_size_and_window_minimum_threshold(self): ++ return ( ++ self._conf["algorithm"]["win_size"], ++ self._conf["algorithm"]["win_threshold"], ++ ) ++ ++ @property ++ def period_time(self): ++ return self._conf["common"]["period_time"] ++ ++ @property ++ def algorithm_type(self): ++ return self._conf["algorithm"]["algorithm_type"] ++ ++ @property ++ def sliding_window_type(self): ++ return self._conf["algorithm"]["win_type"] ++ ++ @property ++ def train_data_duration(self): ++ return self._conf["algorithm"]["train_data_duration"] ++ ++ @property ++ def train_update_duration(self): ++ return self._conf["algorithm"]["train_update_duration"] ++ ++ @property ++ def window_size(self): ++ return self._conf["algorithm"]["win_size"] ++ ++ @property ++ def window_minimum_threshold(self): ++ return self._conf["algorithm"]["win_threshold"] ++ ++ @property ++ def absolute_threshold(self): ++ return self._conf["common"]["absolute_threshold"] ++ ++ @property ++ def log_level(self): ++ return self._conf["log"]["level"] ++ ++ @property ++ def disks_to_detection(self): ++ return self._conf["common"]["disk"] ++ ++ @property ++ def stage(self): ++ return self._conf["common"]["stage"] ++ ++ @property ++ def iotype(self): ++ return self._conf["common"]["iotype"] ++ ++ @property ++ def boxplot_parameter(self): ++ return self._conf["algorithm"]["boxplot_parameter"] ++ ++ @property ++ def n_sigma_parameter(self): ++ return self._conf["algorithm"]["n_sigma_parameter"] ++ ++ @property ++ def read_iodump_lim(self): ++ return self._conf["iodump"]["read_iodump_lim"] ++ ++ @property ++ def write_iodump_lim(self): ++ return self._conf["iodump"]["write_iodump_lim"] +\ No newline at end of file +diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py +new file mode 100644 +index 0000000..2f2d607 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/data_access.py +@@ -0,0 +1,127 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import json ++import logging ++ ++from sentryCollector.collect_plugin import ( ++ Result_Messages, ++ get_io_data, ++ is_iocollect_valid, ++ get_disk_type ++) ++ ++ ++from .io_data import IOStageData, IOData ++ ++COLLECT_STAGES = [ ++ "throtl", ++ "wbt", ++ "gettag", ++ "plug", ++ "bfq", ++ "hctx", ++ "requeue", ++ "rq_driver", ++ "bio", ++ "iocost", ++] ++ ++ ++def check_collect_valid(period): ++ data_raw = is_iocollect_valid(period) ++ if data_raw["ret"] == 0: ++ try: ++ data = json.loads(data_raw["message"]) ++ except Exception as e: ++ logging.warning(f"get valid devices failed, occur exception: {e}") ++ return None ++ if not data: ++ logging.warning(f"get valid devices failed, return {data_raw}") ++ return None ++ return [k for k in data.keys()] ++ else: ++ logging.warning(f"get valid devices failed, return {data_raw}") ++ return None ++ ++ ++def check_detect_frequency_is_valid(period): ++ data_raw = is_iocollect_valid(period) ++ if data_raw["ret"] == 0: ++ try: ++ data = json.loads(data_raw["message"]) ++ except Exception as e: ++ return None ++ if not data: ++ return None ++ return [k for k in data.keys()] ++ else: ++ return None ++ ++ ++def check_disk_is_available(period_time, disk): ++ data_raw = is_iocollect_valid(period_time, disk) ++ if data_raw["ret"] == 0: ++ try: ++ data = json.loads(data_raw["message"]) ++ except Exception as e: ++ return False ++ if not data: ++ return False ++ return True ++ else: ++ return False ++ ++ ++def _get_raw_data(period, disk_list): ++ return get_io_data( ++ period, ++ disk_list, ++ COLLECT_STAGES, ++ ["read", "write", "flush", "discard"], ++ ) ++ ++ ++def _get_io_stage_data(data): ++ io_stage_data = IOStageData() ++ for data_type in ("read", "write", "flush", "discard"): ++ if data_type in data: ++ getattr(io_stage_data, data_type).latency = data[data_type][0] ++ getattr(io_stage_data, data_type).io_dump = data[data_type][1] ++ getattr(io_stage_data, data_type).io_length = data[data_type][2] ++ getattr(io_stage_data, data_type).iops = data[data_type][3] ++ return io_stage_data ++ ++ ++def get_io_data_from_collect_plug(period, disk_list): ++ data_raw = _get_raw_data(period, disk_list) ++ if data_raw["ret"] == 0: ++ ret = {} ++ try: ++ data = json.loads(data_raw["message"]) ++ except json.decoder.JSONDecodeError as e: ++ logging.warning(f"get io data failed, {e}") ++ return None ++ ++ for disk in data: ++ disk_data = data[disk] ++ disk_ret = IOData() ++ for k, v in disk_data.items(): ++ try: ++ getattr(disk_ret, k) ++ setattr(disk_ret, k, _get_io_stage_data(v)) ++ except AttributeError: ++ logging.debug(f"no attr {k}") ++ continue ++ ret[disk] = disk_ret ++ return ret ++ logging.warning(f'get io data failed with message: {data_raw["message"]}') ++ return None +diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py +new file mode 100644 +index 0000000..27fb7f7 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/detector.py +@@ -0,0 +1,156 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++from datetime import datetime ++ ++from .io_data import MetricName ++from .threshold import Threshold ++from .sliding_window import SlidingWindow ++from .utils import get_metric_value_from_io_data_dict_by_metric_name ++ ++ ++class Detector: ++ ++ def __init__(self, metric_name: MetricName, threshold: Threshold, sliding_window: SlidingWindow): ++ self._metric_name = metric_name ++ self._threshold = threshold ++ # for when threshold update, it can print latest threshold with metric name ++ self._threshold.set_metric_name(self._metric_name) ++ self._slidingWindow = sliding_window ++ self._threshold.attach_observer(self._slidingWindow) ++ self._count = None ++ ++ @property ++ def metric_name(self): ++ return self._metric_name ++ ++ def get_sliding_window_data(self): ++ return self._slidingWindow.get_data() ++ ++ def is_slow_io_event(self, io_data_dict_with_disk_name: dict): ++ if self._count is None: ++ self._count = datetime.now() ++ else: ++ now_time = datetime.now() ++ time_diff = (now_time - self._count).total_seconds() ++ if time_diff >= 60: ++ logging.info(f"({self._metric_name}) 's latest ai threshold is: {self._threshold.get_threshold()}.") ++ self._count = None ++ ++ logging.debug(f'enter Detector: {self}') ++ metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name) ++ if metric_value is None: ++ logging.debug('not found metric value, so return None.') ++ return (False, False), None, None, None, None ++ logging.debug(f'input metric value: {str(metric_value)}') ++ self._threshold.push_latest_data_to_queue(metric_value) ++ detection_result = self._slidingWindow.is_slow_io_event(metric_value) ++ # 检测到慢周期,由Detector负责打印info级别日志 ++ if detection_result[0][1]: ++ logging.info(f'[abnormal_period]: disk: {self._metric_name.disk_name}, ' ++ f'stage: {self._metric_name.stage_name}, ' ++ f'iotype: {self._metric_name.io_access_type_name}, ' ++ f'type: {self._metric_name.metric_name}, ' ++ f'ai_threshold: {round(detection_result[2], 3)}, ' ++ f'curr_val: {metric_value}') ++ else: ++ logging.debug(f'Detection result: {str(detection_result)}') ++ logging.debug(f'exit Detector: {self}') ++ return detection_result ++ ++ def __repr__(self): ++ return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},' ++ f' io_type_name: {self._metric_name.io_access_type_name},' ++ f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},' ++ f' sliding_window_type: {self._slidingWindow}') ++ ++ ++def set_to_str(parameter: set): ++ ret = "" ++ parameter = list(parameter) ++ length = len(parameter) ++ for i in range(length): ++ if i == 0: ++ ret += parameter[i] ++ else: ++ ret += "," + parameter[i] ++ return ret ++ ++ ++class DiskDetector: ++ ++ def __init__(self, disk_name: str): ++ self._disk_name = disk_name ++ self._detector_list = [] ++ ++ def add_detector(self, detector: Detector): ++ self._detector_list.append(detector) ++ ++ def get_detector_list_window(self): ++ latency_wins = {"read": {}, "write": {}} ++ iodump_wins = {"read": {}, "write": {}} ++ for detector in self._detector_list: ++ if detector.metric_name.metric_name == 'latency': ++ latency_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() ++ elif detector.metric_name.metric_name == 'io_dump': ++ iodump_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() ++ return latency_wins, iodump_wins ++ ++ def is_slow_io_event(self, io_data_dict_with_disk_name: dict): ++ diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} ++ for detector in self._detector_list: ++ # result返回内容:(是否检测到慢IO,是否检测到慢周期)、窗口、ai阈值、绝对阈值 ++ # 示例: (False, False), self._io_data_queue, self._ai_threshold, self._abs_threshold ++ result = detector.is_slow_io_event(io_data_dict_with_disk_name) ++ if result[0][0]: ++ if detector.metric_name.stage_name == "bio": ++ diagnosis_info["bio"].append(detector.metric_name) ++ elif detector.metric_name.stage_name == "rq_driver": ++ diagnosis_info["rq_driver"].append(detector.metric_name) ++ else: ++ diagnosis_info["kernel_stack"].append(detector.metric_name) ++ ++ if len(diagnosis_info["bio"]) == 0: ++ return False, None, None, None, None, None, None ++ ++ driver_name = self._disk_name ++ reason = "unknown" ++ block_stack = set() ++ io_type = set() ++ alarm_type = set() ++ ++ for key, value in diagnosis_info.items(): ++ for metric_name in value: ++ block_stack.add(metric_name.stage_name) ++ io_type.add(metric_name.io_access_type_name) ++ alarm_type.add(metric_name.metric_name) ++ ++ latency_wins, iodump_wins = self.get_detector_list_window() ++ details = {"latency": latency_wins, "iodump": iodump_wins} ++ ++ io_press = {"throtl", "wbt", "iocost", "bfq"} ++ driver_slow = {"rq_driver"} ++ kernel_slow = {"gettag", "plug", "deadline", "hctx", "requeue"} ++ ++ if not io_press.isdisjoint(block_stack): ++ reason = "io_press" ++ elif not driver_slow.isdisjoint(block_stack): ++ reason = "driver_slow" ++ elif not kernel_slow.isdisjoint(block_stack): ++ reason = "kernel_slow" ++ ++ return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details ++ ++ def __repr__(self): ++ msg = f'disk: {self._disk_name}, ' ++ for detector in self._detector_list: ++ msg += f'\n detector: [{detector}]' ++ return msg +diff --git a/src/python/sentryPlugins/ai_block_io/io_data.py b/src/python/sentryPlugins/ai_block_io/io_data.py +new file mode 100644 +index 0000000..6042911 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/io_data.py +@@ -0,0 +1,54 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++from dataclasses import dataclass, field ++from datetime import datetime ++from typing import Optional ++ ++ ++@dataclass ++class BaseData: ++ latency: Optional[float] = field(default_factory=lambda: None) ++ io_dump: Optional[int] = field(default_factory=lambda: None) ++ io_length: Optional[int] = field(default_factory=lambda: None) ++ iops: Optional[int] = field(default_factory=lambda: None) ++ ++ ++@dataclass ++class IOStageData: ++ read: BaseData = field(default_factory=lambda: BaseData()) ++ write: BaseData = field(default_factory=lambda: BaseData()) ++ flush: BaseData = field(default_factory=lambda: BaseData()) ++ discard: BaseData = field(default_factory=lambda: BaseData()) ++ ++ ++@dataclass ++class IOData: ++ throtl: IOStageData = field(default_factory=lambda: IOStageData()) ++ wbt: IOStageData = field(default_factory=lambda: IOStageData()) ++ gettag: IOStageData = field(default_factory=lambda: IOStageData()) ++ iocost: IOStageData = field(default_factory=lambda: IOStageData()) ++ plug: IOStageData = field(default_factory=lambda: IOStageData()) ++ bfq: IOStageData = field(default_factory=lambda: IOStageData()) ++ hctx: IOStageData = field(default_factory=lambda: IOStageData()) ++ requeue: IOStageData = field(default_factory=lambda: IOStageData()) ++ rq_driver: IOStageData = field(default_factory=lambda: IOStageData()) ++ bio: IOStageData = field(default_factory=lambda: IOStageData()) ++ time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) ++ ++ ++@dataclass(frozen=True) ++class MetricName: ++ disk_name: str ++ disk_type: int ++ stage_name: str ++ io_access_type_name: str ++ metric_name: str +diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py +new file mode 100644 +index 0000000..a13033f +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py +@@ -0,0 +1,129 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++from enum import Enum, unique ++import numpy as np ++ ++ ++@unique ++class SlidingWindowType(Enum): ++ NotContinuousSlidingWindow = 0 ++ ContinuousSlidingWindow = 1 ++ MedianSlidingWindow = 2 ++ ++ ++class SlidingWindow: ++ def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None, avg_lim: int = None): ++ self._queue_length = queue_length ++ self._queue_threshold = threshold ++ self._ai_threshold = None ++ self._abs_threshold = abs_threshold ++ self._avg_lim = avg_lim ++ self._io_data_queue = [] ++ self._io_data_queue_abnormal_tag = [] ++ ++ def is_abnormal(self, data): ++ if self._avg_lim is not None and data < self._avg_lim: ++ return False ++ if self._ai_threshold is not None and data > self._ai_threshold: ++ return True ++ if self._abs_threshold is not None and data > self._abs_threshold: ++ return True ++ ++ def push(self, data: float): ++ if len(self._io_data_queue) == self._queue_length: ++ self._io_data_queue.pop(0) ++ self._io_data_queue_abnormal_tag.pop(0) ++ self._io_data_queue.append(data) ++ tag = self.is_abnormal(data) ++ self._io_data_queue_abnormal_tag.append(tag) ++ return tag ++ ++ def update(self, threshold): ++ if self._ai_threshold == threshold: ++ return ++ self._ai_threshold = threshold ++ self._io_data_queue_abnormal_tag.clear() ++ for data in self._io_data_queue: ++ self._io_data_queue_abnormal_tag.append(self.is_abnormal(data)) ++ ++ def is_slow_io_event(self, data): ++ return False, None, None, None ++ ++ def get_data(self): ++ return self._io_data_queue ++ ++ def __repr__(self): ++ return "[SlidingWindow]" ++ ++ ++class NotContinuousSlidingWindow(SlidingWindow): ++ def is_slow_io_event(self, data): ++ is_abnormal_period = super().push(data) ++ is_slow_io_event = False ++ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): ++ is_slow_io_event = False ++ if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold: ++ is_slow_io_event = True ++ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim ++ ++ def __repr__(self): ++ return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" ++ ++ ++class ContinuousSlidingWindow(SlidingWindow): ++ def is_slow_io_event(self, data): ++ is_abnormal_period = super().push(data) ++ is_slow_io_event = False ++ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): ++ is_slow_io_event = False ++ consecutive_count = 0 ++ for tag in self._io_data_queue_abnormal_tag: ++ if tag: ++ consecutive_count += 1 ++ if consecutive_count >= self._queue_threshold: ++ is_slow_io_event = True ++ break ++ else: ++ consecutive_count = 0 ++ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim ++ ++ def __repr__(self): ++ return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" ++ ++ ++class MedianSlidingWindow(SlidingWindow): ++ def is_slow_io_event(self, data): ++ is_abnormal_period = super().push(data) ++ is_slow_io_event = False ++ if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): ++ is_slow_io_event = False ++ median = np.median(self._io_data_queue) ++ if (self._ai_threshold is not None and median > self._ai_threshold) or (self._abs_threshold is not None and median > self._abs_threshold): ++ is_slow_io_event = True ++ return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim ++ ++ def __repr__(self): ++ return f"[MedianSlidingWindow, window size: {self._queue_length}]" ++ ++ ++class SlidingWindowFactory: ++ def get_sliding_window( ++ self, sliding_window_type: SlidingWindowType, *args, **kwargs ++ ): ++ if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow: ++ return NotContinuousSlidingWindow(*args, **kwargs) ++ elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow: ++ return ContinuousSlidingWindow(*args, **kwargs) ++ elif sliding_window_type == SlidingWindowType.MedianSlidingWindow: ++ return MedianSlidingWindow(*args, **kwargs) ++ else: ++ return NotContinuousSlidingWindow(*args, **kwargs) +diff --git a/src/python/sentryPlugins/ai_block_io/threshold.py b/src/python/sentryPlugins/ai_block_io/threshold.py +new file mode 100644 +index 0000000..e202bb8 +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/threshold.py +@@ -0,0 +1,178 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++from enum import Enum ++import queue ++import numpy as np ++import math ++ ++from .sliding_window import SlidingWindow ++ ++ ++class ThresholdState(Enum): ++ INIT = 0 ++ START = 1 ++ ++ ++class Threshold: ++ ++ def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000): ++ self._observer = None ++ self.data_queue = queue.Queue(data_queue_size) ++ self.data_queue_update_size = data_queue_update_size ++ self.new_data_size = 0 ++ self.threshold_state = ThresholdState.INIT ++ self.threshold = math.inf ++ self.metric_name = None ++ ++ def set_threshold(self, threshold): ++ self.threshold = threshold ++ self.threshold_state = ThresholdState.START ++ self.notify_observer() ++ ++ def set_metric_name(self, metric_name): ++ self.metric_name = metric_name ++ ++ def get_threshold(self): ++ if self.threshold_state == ThresholdState.INIT: ++ return None ++ return self.threshold ++ ++ def is_abnormal(self, data): ++ if self.threshold_state == ThresholdState.INIT: ++ return False ++ return data >= self.threshold ++ ++ # 使用观察者模式,当阈值更新时,自动同步刷新滑窗中的阈值 ++ def attach_observer(self, observer: SlidingWindow): ++ self._observer = observer ++ ++ def notify_observer(self): ++ if self._observer is not None: ++ self._observer.update(self.threshold) ++ ++ def push_latest_data_to_queue(self, data): ++ pass ++ ++ def __repr__(self): ++ return "Threshold" ++ ++ def __str__(self): ++ return "Threshold" ++ ++ ++class AbsoluteThreshold(Threshold): ++ def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): ++ super().__init__(data_queue_size, data_queue_update_size) ++ ++ def push_latest_data_to_queue(self, data): ++ pass ++ ++ def __repr__(self): ++ return "[AbsoluteThreshold]" ++ ++ def __str__(self): ++ return "absolute" ++ ++ ++class BoxplotThreshold(Threshold): ++ def __init__(self, boxplot_parameter: float = 1.5, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): ++ super().__init__(data_queue_size, data_queue_update_size) ++ self.parameter = boxplot_parameter ++ ++ def _update_threshold(self): ++ old_threshold = self.threshold ++ data = list(self.data_queue.queue) ++ q1 = np.percentile(data, 25) ++ q3 = np.percentile(data, 75) ++ iqr = q3 - q1 ++ self.threshold = q3 + self.parameter * iqr ++ if self.threshold_state == ThresholdState.INIT: ++ self.threshold_state = ThresholdState.START ++ logging.info(f"MetricName: [{self.metric_name}]'s threshold update, old is: {old_threshold} -> new is: {self.threshold}") ++ self.notify_observer() ++ ++ def push_latest_data_to_queue(self, data): ++ if data < 1e-6: ++ return ++ try: ++ self.data_queue.put(data, block=False) ++ except queue.Full: ++ self.data_queue.get() ++ self.data_queue.put(data) ++ self.new_data_size += 1 ++ if (self.data_queue.full() and (self.threshold_state == ThresholdState.INIT or ++ (self.threshold_state == ThresholdState.START and ++ self.new_data_size >= self.data_queue_update_size))): ++ self._update_threshold() ++ self.new_data_size = 0 ++ ++ def __repr__(self): ++ return f"[BoxplotThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" ++ ++ def __str__(self): ++ return "boxplot" ++ ++ ++class NSigmaThreshold(Threshold): ++ def __init__(self, n_sigma_parameter: float = 3.0, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): ++ super().__init__(data_queue_size, data_queue_update_size) ++ self.parameter = n_sigma_parameter ++ ++ def _update_threshold(self): ++ old_threshold = self.threshold ++ data = list(self.data_queue.queue) ++ mean = np.mean(data) ++ std = np.std(data) ++ self.threshold = mean + self.parameter * std ++ if self.threshold_state == ThresholdState.INIT: ++ self.threshold_state = ThresholdState.START ++ logging.info(f"MetricName: [{self.metric_name}]'s threshold update, old is: {old_threshold} -> new is: {self.threshold}") ++ self.notify_observer() ++ ++ def push_latest_data_to_queue(self, data): ++ if data < 1e-6: ++ return ++ try: ++ self.data_queue.put(data, block=False) ++ except queue.Full: ++ self.data_queue.get() ++ self.data_queue.put(data) ++ self.new_data_size += 1 ++ if (self.data_queue.full() and (self.threshold_state == ThresholdState.INIT or ++ (self.threshold_state == ThresholdState.START and ++ self.new_data_size >= self.data_queue_update_size))): ++ self._update_threshold() ++ self.new_data_size = 0 ++ ++ def __repr__(self): ++ return f"[NSigmaThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" ++ ++ def __str__(self): ++ return "n_sigma" ++ ++ ++class ThresholdType(Enum): ++ AbsoluteThreshold = 0 ++ BoxplotThreshold = 1 ++ NSigmaThreshold = 2 ++ ++ ++class ThresholdFactory: ++ def get_threshold(self, threshold_type: ThresholdType, *args, **kwargs): ++ if threshold_type == ThresholdType.AbsoluteThreshold: ++ return AbsoluteThreshold(*args, **kwargs) ++ elif threshold_type == ThresholdType.BoxplotThreshold: ++ return BoxplotThreshold(*args, **kwargs) ++ elif threshold_type == ThresholdType.NSigmaThreshold: ++ return NSigmaThreshold(*args, **kwargs) ++ else: ++ raise ValueError(f"Invalid threshold type: {threshold_type}") +diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py +new file mode 100644 +index 0000000..7d2390b +--- /dev/null ++++ b/src/python/sentryPlugins/ai_block_io/utils.py +@@ -0,0 +1,73 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++import logging ++from dataclasses import asdict ++ ++ ++from .threshold import ThresholdType ++from .sliding_window import SlidingWindowType ++from .io_data import MetricName, IOData ++ ++ ++def get_threshold_type_enum(algorithm_type: str): ++ if algorithm_type.lower() == "boxplot": ++ return ThresholdType.BoxplotThreshold ++ if algorithm_type.lower() == "n_sigma": ++ return ThresholdType.NSigmaThreshold ++ return None ++ ++ ++def get_sliding_window_type_enum(sliding_window_type: str): ++ if sliding_window_type.lower() == "not_continuous": ++ return SlidingWindowType.NotContinuousSlidingWindow ++ if sliding_window_type.lower() == "continuous": ++ return SlidingWindowType.ContinuousSlidingWindow ++ if sliding_window_type.lower() == "median": ++ return SlidingWindowType.MedianSlidingWindow ++ return None ++ ++ ++def get_metric_value_from_io_data_dict_by_metric_name( ++ io_data_dict: dict, metric_name: MetricName ++): ++ try: ++ io_data: IOData = io_data_dict[metric_name.disk_name] ++ io_stage_data = asdict(io_data)[metric_name.stage_name] ++ base_data = io_stage_data[metric_name.io_access_type_name] ++ metric_value = base_data[metric_name.metric_name] ++ return metric_value ++ except KeyError: ++ return None ++ ++ ++def get_data_queue_size_and_update_size( ++ training_data_duration: float, ++ train_update_duration: float, ++ slow_io_detect_frequency: int, ++): ++ data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency) ++ update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency) ++ return data_queue_size, update_size ++ ++ ++def get_log_level(log_level: str): ++ if log_level.lower() == "debug": ++ return logging.DEBUG ++ elif log_level.lower() == "info": ++ return logging.INFO ++ elif log_level.lower() == "warning": ++ return logging.WARNING ++ elif log_level.lower() == "error": ++ return logging.ERROR ++ elif log_level.lower() == "critical": ++ return logging.CRITICAL ++ return logging.INFO +diff --git a/src/python/sentryPlugins/avg_block_io/__init__.py b/src/python/sentryPlugins/avg_block_io/__init__.py +new file mode 100644 +index 0000000..e69de29 +diff --git a/src/python/sentryPlugins/avg_block_io/avg_block_io.py b/src/python/sentryPlugins/avg_block_io/avg_block_io.py +new file mode 100644 +index 0000000..899d517 +--- /dev/null ++++ b/src/python/sentryPlugins/avg_block_io/avg_block_io.py +@@ -0,0 +1,189 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++import signal ++import configparser ++import time ++ ++from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage ++from .stage_window import IoWindow, IoDumpWindow ++from .module_conn import avg_is_iocollect_valid, avg_get_io_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation ++from .utils import update_avg_and_check_abnormal ++ ++CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" ++ ++ ++def init_io_win(io_dic, config, common_param): ++ """initialize windows of latency, iodump, and dict of avg_value""" ++ iotype_list = io_dic["iotype_list"] ++ io_data = {} ++ io_avg_value = {} ++ for disk_name in io_dic["disk_list"]: ++ io_data[disk_name] = {} ++ io_avg_value[disk_name] = {} ++ curr_disk_type = get_disk_type_by_name(disk_name) ++ for stage_name in io_dic["stage_list"]: ++ io_data[disk_name][stage_name] = {} ++ io_avg_value[disk_name][stage_name] = {} ++ # 解析stage配置 ++ curr_stage_param = read_config_stage(config, stage_name, iotype_list, curr_disk_type) ++ for rw in iotype_list: ++ io_data[disk_name][stage_name][rw] = {} ++ io_avg_value[disk_name][stage_name][rw] = [0, 0] ++ ++ # 对每个rw创建latency和iodump窗口 ++ avg_lim_key = "{}_avg_lim".format(rw) ++ avg_time_key = "{}_avg_time".format(rw) ++ tot_lim_key = "{}_tot_lim".format(rw) ++ iodump_lim_key = "{}_iodump_lim".format(rw) ++ ++ # 获取值,优先从 curr_stage_param 获取,如果不存在,则从 common_param 获取 ++ avg_lim_value = curr_stage_param.get(avg_lim_key, common_param.get(curr_disk_type, {}).get(avg_lim_key)) ++ avg_time_value = curr_stage_param.get(avg_time_key, common_param.get(curr_disk_type, {}).get(avg_time_key)) ++ tot_lim_value = curr_stage_param.get(tot_lim_key, common_param.get(curr_disk_type, {}).get(tot_lim_key)) ++ iodump_lim_value = curr_stage_param.get(iodump_lim_key, common_param.get("iodump", {}).get(iodump_lim_key)) ++ ++ if avg_lim_value and avg_time_value and tot_lim_value: ++ io_data[disk_name][stage_name][rw]["latency"] = IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, abnormal_time=tot_lim_value) ++ logging.debug("Successfully create {}-{}-{}-latency window".format(disk_name, stage_name, rw)) ++ ++ if iodump_lim_value is not None: ++ io_data[disk_name][stage_name][rw]["iodump"] = IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_time=iodump_lim_value) ++ logging.debug("Successfully create {}-{}-{}-iodump window".format(disk_name, stage_name, rw)) ++ return io_data, io_avg_value ++ ++ ++def get_valid_disk_stage_list(io_dic, config_disk, config_stage): ++ """get disk_list and stage_list by sentryCollector""" ++ json_data = avg_is_iocollect_valid(io_dic, config_disk, config_stage) ++ ++ all_disk_set = json_data.keys() ++ all_stage_set = set() ++ for disk_stage_list in json_data.values(): ++ all_stage_set.update(disk_stage_list) ++ ++ disk_list = [key for key in all_disk_set if key in config_disk] ++ not_in_disk_list = [key for key in config_disk if key not in all_disk_set] ++ ++ if not config_disk and not not_in_disk_list: ++ disk_list = [key for key in all_disk_set] ++ ++ if not disk_list: ++ report_alarm_fail("Cannot get valid disk name") ++ ++ disk_list = check_disk_list_validation(disk_list) ++ ++ disk_list = disk_list[:10] if len(disk_list) > 10 else disk_list ++ ++ if not config_disk: ++ logging.info(f"Default common.disk using disk={disk_list}") ++ elif sorted(disk_list) != sorted(config_disk): ++ logging.warning(f"Set common.disk to {disk_list}") ++ ++ stage_list = [key for key in all_stage_set if key in config_stage] ++ not_in_stage_list = [key for key in config_stage if key not in all_stage_set] ++ ++ if not_in_stage_list: ++ report_alarm_fail(f"Invalid common.stage_list config, cannot set {not_in_stage_list}") ++ ++ if not config_stage: ++ stage_list = [key for key in all_stage_set] ++ ++ if not stage_list: ++ report_alarm_fail("Cannot get valid stage name.") ++ ++ if not config_stage: ++ logging.info(f"Default common.stage using stage={stage_list}") ++ ++ return disk_list, stage_list ++ ++ ++def main_loop(io_dic, io_data, io_avg_value): ++ """main loop of avg_block_io""" ++ period_time = io_dic["period_time"] ++ disk_list = io_dic["disk_list"] ++ stage_list = io_dic["stage_list"] ++ iotype_list = io_dic["iotype_list"] ++ win_size = io_dic["win_size"] ++ # 开始循环 ++ while True: ++ # 等待x秒 ++ time.sleep(period_time) ++ ++ # 采集模块对接,获取周期数据 ++ is_success, curr_period_data = avg_get_io_data(io_dic) ++ if not is_success: ++ logging.error(f"{curr_period_data['msg']}") ++ continue ++ ++ # 处理周期数据 ++ reach_size = False ++ for disk_name in disk_list: ++ for stage_name in stage_list: ++ for rw in iotype_list: ++ if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: ++ io_key = (disk_name, stage_name, rw) ++ reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) ++ ++ # win_size不满时不进行告警判断 ++ if not reach_size: ++ continue ++ ++ # 判断异常窗口、异常场景 ++ for disk_name in disk_list: ++ for rw in iotype_list: ++ process_report_data(disk_name, rw, io_data) ++ ++ ++def main(): ++ """main func""" ++ # 注册停止信号-2/-15 ++ signal.signal(signal.SIGINT, sig_handler) ++ signal.signal(signal.SIGTERM, sig_handler) ++ ++ log_level = read_config_log(CONFIG_FILE) ++ log_format = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" ++ logging.basicConfig(level=log_level, format=log_format) ++ ++ # 初始化配置读取 ++ config = configparser.ConfigParser(comment_prefixes=('#', ';')) ++ try: ++ config.read(CONFIG_FILE) ++ except configparser.Error: ++ report_alarm_fail("Failed to read config file") ++ ++ io_dic = {} ++ ++ # 读取配置文件 -- common段 ++ io_dic["period_time"], disk, stage, io_dic["iotype_list"] = read_config_common(config) ++ ++ # 采集模块对接,is_iocollect_valid() ++ io_dic["disk_list"], io_dic["stage_list"] = get_valid_disk_stage_list(io_dic, disk, stage) ++ ++ logging.debug(f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}") ++ ++ if "bio" not in io_dic["stage_list"]: ++ report_alarm_fail("Cannot run avg_block_io without bio stage") ++ ++ # 初始化窗口 -- config读取,对应is_iocollect_valid返回的结果 ++ # step1. 解析公共配置 --- algorithm ++ io_dic["win_size"], io_dic["win_threshold"] = read_config_algorithm(config) ++ ++ # step2. 解析公共配置 --- latency_xxx ++ common_param = read_config_latency(config) ++ ++ # step3. 解析公共配置 --- iodump ++ common_param['iodump'] = read_config_iodump(config) ++ ++ # step4. 循环创建窗口 ++ io_data, io_avg_value = init_io_win(io_dic, config, common_param) ++ ++ main_loop(io_dic, io_data, io_avg_value) +diff --git a/src/python/sentryPlugins/avg_block_io/config.py b/src/python/sentryPlugins/avg_block_io/config.py +new file mode 100644 +index 0000000..c1e8ab1 +--- /dev/null ++++ b/src/python/sentryPlugins/avg_block_io/config.py +@@ -0,0 +1,208 @@ ++import configparser ++import logging ++import os ++ ++from .module_conn import report_alarm_fail ++from sentryCollector.collect_plugin import Disk_Type ++ ++ ++CONF_LOG = 'log' ++CONF_LOG_LEVEL = 'level' ++LogLevel = { ++ "debug": logging.DEBUG, ++ "info": logging.INFO, ++ "warning": logging.WARNING, ++ "error": logging.ERROR, ++ "critical": logging.CRITICAL ++} ++ ++CONF_COMMON = 'common' ++CONF_COMMON_DISK = 'disk' ++CONF_COMMON_STAGE = 'stage' ++CONF_COMMON_IOTYPE = 'iotype' ++CONF_COMMON_PER_TIME = 'period_time' ++ ++CONF_ALGO = 'algorithm' ++CONF_ALGO_SIZE = 'win_size' ++CONF_ALGO_THRE = 'win_threshold' ++ ++CONF_LATENCY = 'latency_{}' ++CONF_IODUMP = 'iodump' ++ ++ ++DEFAULT_PARAM = { ++ CONF_LOG: { ++ CONF_LOG_LEVEL: 'info' ++ }, CONF_COMMON: { ++ CONF_COMMON_DISK: 'default', ++ CONF_COMMON_STAGE: 'default', ++ CONF_COMMON_IOTYPE: 'read,write', ++ CONF_COMMON_PER_TIME: 1 ++ }, CONF_ALGO: { ++ CONF_ALGO_SIZE: 30, ++ CONF_ALGO_THRE: 6 ++ }, 'latency_nvme_ssd': { ++ 'read_avg_lim': 10000, ++ 'write_avg_lim': 10000, ++ 'read_avg_time': 3, ++ 'write_avg_time': 3, ++ 'read_tot_lim': 50000, ++ 'write_tot_lim': 50000, ++ }, 'latency_sata_ssd' : { ++ 'read_avg_lim': 10000, ++ 'write_avg_lim': 10000, ++ 'read_avg_time': 3, ++ 'write_avg_time': 3, ++ 'read_tot_lim': 50000, ++ 'write_tot_lim': 50000, ++ }, 'latency_sata_hdd' : { ++ 'read_avg_lim': 15000, ++ 'write_avg_lim': 15000, ++ 'read_avg_time': 3, ++ 'write_avg_time': 3, ++ 'read_tot_lim': 50000, ++ 'write_tot_lim': 50000 ++ }, CONF_IODUMP: { ++ 'read_iodump_lim': 0, ++ 'write_iodump_lim': 0 ++ } ++} ++ ++ ++def get_section_value(section_name, config): ++ common_param = {} ++ config_sec = config[section_name] ++ for config_key in DEFAULT_PARAM[section_name]: ++ if config_key in config_sec: ++ if not config_sec[config_key].isdecimal(): ++ report_alarm_fail(f"Invalid {section_name}.{config_key} config.") ++ common_param[config_key] = int(config_sec[config_key]) ++ else: ++ common_param[config_key] = DEFAULT_PARAM[section_name][config_key] ++ logging.warning(f"Unset {section_name}.{config_key} in config file, use {common_param[config_key]} as default") ++ return common_param ++ ++ ++def read_config_log(filename): ++ """read config file, get [log] section value""" ++ default_log_level = DEFAULT_PARAM[CONF_LOG][CONF_LOG_LEVEL] ++ if not os.path.exists(filename): ++ return LogLevel.get(default_log_level) ++ ++ config = configparser.ConfigParser() ++ config.read(filename) ++ ++ log_level = config.get(CONF_LOG, CONF_LOG_LEVEL, fallback=default_log_level) ++ if log_level.lower() in LogLevel: ++ return LogLevel.get(log_level.lower()) ++ return LogLevel.get(default_log_level) ++ ++ ++def read_config_common(config): ++ """read config file, get [common] section value""" ++ if not config.has_section(CONF_COMMON): ++ report_alarm_fail(f"Cannot find {CONF_COMMON} section in config file") ++ ++ try: ++ disk_name = config.get(CONF_COMMON, CONF_COMMON_DISK).lower() ++ disk = [] if disk_name == "default" else disk_name.split(",") ++ except configparser.NoOptionError: ++ disk = [] ++ logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_DISK}, set to default") ++ ++ try: ++ stage_name = config.get(CONF_COMMON, CONF_COMMON_STAGE).lower() ++ stage = [] if stage_name == "default" else stage_name.split(",") ++ except configparser.NoOptionError: ++ stage = [] ++ logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_STAGE}, set to default") ++ ++ if len(disk) > 10: ++ logging.warning(f"Too many {CONF_COMMON}.disks, record only max 10 disks") ++ disk = disk[:10] ++ ++ try: ++ iotype_name = config.get(CONF_COMMON, CONF_COMMON_IOTYPE).lower().split(",") ++ iotype_list = [rw.lower() for rw in iotype_name if rw.lower() in ['read', 'write']] ++ err_iotype = [rw.lower() for rw in iotype_name if rw.lower() not in ['read', 'write']] ++ ++ if err_iotype: ++ report_alarm_fail(f"Invalid {CONF_COMMON}.{CONF_COMMON_IOTYPE} config") ++ ++ except configparser.NoOptionError: ++ iotype_list = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_IOTYPE] ++ logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotupe_list} as default") ++ ++ try: ++ period_time = int(config.get(CONF_COMMON, CONF_COMMON_PER_TIME)) ++ if not (1 <= period_time <= 300): ++ raise ValueError("Invalid period_time") ++ except ValueError: ++ report_alarm_fail(f"Invalid {CONF_COMMON}.{CONF_COMMON_PER_TIME}") ++ except configparser.NoOptionError: ++ period_time = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_PER_TIME] ++ logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_PER_TIME}, use {period_time} as default") ++ ++ return period_time, disk, stage, iotype_list ++ ++ ++def read_config_algorithm(config): ++ """read config file, get [algorithm] section value""" ++ if not config.has_section(CONF_ALGO): ++ report_alarm_fail(f"Cannot find {CONF_ALGO} section in config file") ++ ++ try: ++ win_size = int(config.get(CONF_ALGO, CONF_ALGO_SIZE)) ++ if not (1 <= win_size <= 300): ++ raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_SIZE}") ++ except ValueError: ++ report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_SIZE} config") ++ except configparser.NoOptionError: ++ win_size = DEFAULT_PARAM[CONF_ALGO][CONF_ALGO_SIZE] ++ logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_SIZE}, use {win_size} as default") ++ ++ try: ++ win_threshold = int(config.get(CONF_ALGO, CONF_ALGO_THRE)) ++ if win_threshold < 1 or win_threshold > 300 or win_threshold > win_size: ++ raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE}") ++ except ValueError: ++ report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE} config") ++ except configparser.NoOptionError: ++ win_threshold = DEFAULT_PARAM[CONF_ALGO]['win_threshold'] ++ logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE}, use {win_threshold} as default") ++ ++ return win_size, win_threshold ++ ++ ++def read_config_latency(config): ++ """read config file, get [latency_xxx] section value""" ++ common_param = {} ++ for type_name in Disk_Type: ++ section_name = CONF_LATENCY.format(Disk_Type[type_name]) ++ if not config.has_section(section_name): ++ report_alarm_fail(f"Cannot find {section_name} section in config file") ++ ++ common_param[Disk_Type[type_name]] = get_section_value(section_name, config) ++ return common_param ++ ++ ++def read_config_iodump(config): ++ """read config file, get [iodump] section value""" ++ if not config.has_section(CONF_IODUMP): ++ report_alarm_fail(f"Cannot find {CONF_IODUMP} section in config file") ++ ++ return get_section_value(CONF_IODUMP, config) ++ ++ ++def read_config_stage(config, stage, iotype_list, curr_disk_type): ++ """read config file, get [STAGE_NAME_diskType] section value""" ++ res = {} ++ section_name = f"{stage}_{curr_disk_type}" ++ if not config.has_section(section_name): ++ return res ++ ++ for key in config[section_name]: ++ if config[stage][key].isdecimal(): ++ res[key] = int(config[stage][key]) ++ ++ return res +diff --git a/src/python/sentryPlugins/avg_block_io/module_conn.py b/src/python/sentryPlugins/avg_block_io/module_conn.py +new file mode 100644 +index 0000000..a67ef45 +--- /dev/null ++++ b/src/python/sentryPlugins/avg_block_io/module_conn.py +@@ -0,0 +1,145 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import json ++import logging ++import sys ++import time ++ ++from .utils import is_abnormal, get_win_data, log_slow_win ++from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, Result_Messages, get_disk_type, Disk_Type ++from syssentry.result import ResultLevel, report_result ++from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR ++ ++ ++TASK_NAME = "avg_block_io" ++ ++def sig_handler(signum, _f): ++ """stop avg_block_io""" ++ report_result(TASK_NAME, ResultLevel.PASS, json.dumps({})) ++ logging.info("Finished avg_block_io plugin running.") ++ sys.exit(0) ++ ++def avg_get_io_data(io_dic): ++ """get_io_data from sentryCollector""" ++ logging.debug(f"send to sentryCollector get_io_data: period={io_dic['period_time']}, " ++ f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") ++ res = get_io_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) ++ return check_result_validation(res, 'get io data') ++ ++ ++def avg_is_iocollect_valid(io_dic, config_disk, config_stage): ++ """is_iocollect_valid from sentryCollector""" ++ logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " ++ f"disk={config_disk}, stage={config_stage}") ++ res = is_iocollect_valid(io_dic["period_time"], config_disk, config_stage) ++ is_success, data = check_result_validation(res, 'check config validation') ++ if not is_success: ++ report_alarm_fail(f"{data['msg']}") ++ return data ++ ++ ++def check_result_validation(res, reason): ++ """check validation of result from sentryCollector""" ++ if not 'ret' in res or not 'message' in res: ++ return False, {'msg': f"Failed to {reason}: Cannot connect to sentryCollector"} ++ if res['ret'] != 0: ++ return False, {'msg': f"Failed to {reason}: {Result_Messages[res['ret']]}"} ++ ++ try: ++ json_data = json.loads(res['message']) ++ except json.JSONDecodeError: ++ return False, {'msg': f"Failed to {reason}: invalid return message"} ++ ++ return True, json_data ++ ++ ++def report_alarm_fail(alarm_info): ++ """report result to xalarmd""" ++ report_result(TASK_NAME, ResultLevel.FAIL, json.dumps({"msg": alarm_info})) ++ logging.critical(alarm_info) ++ sys.exit(1) ++ ++ ++def process_report_data(disk_name, rw, io_data): ++ """check abnormal window and report to xalarm""" ++ abnormal, abnormal_list = is_abnormal((disk_name, 'bio', rw), io_data) ++ if not abnormal: ++ return ++ ++ msg = { ++ "alarm_source": TASK_NAME, "driver_name": disk_name, "io_type": rw, ++ "reason": "unknown", "block_stack": "bio", "alarm_type": abnormal_list, ++ "details": get_win_data(disk_name, rw, io_data) ++ } ++ ++ # io press ++ ctrl_stage = ['throtl', 'wbt', 'iocost', 'bfq'] ++ for stage_name in ctrl_stage: ++ abnormal, abnormal_list = is_abnormal((disk_name, stage_name, rw), io_data) ++ if not abnormal: ++ continue ++ msg["reason"] = "IO press" ++ msg["block_stack"] = f"bio,{stage_name}" ++ msg["alarm_type"] = abnormal_list ++ log_slow_win(msg, "IO press") ++ xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) ++ return ++ ++ # driver slow ++ abnormal, abnormal_list = is_abnormal((disk_name, 'rq_driver', rw), io_data) ++ if abnormal: ++ msg["reason"] = "driver slow" ++ msg["block_stack"] = "bio,rq_driver" ++ msg["alarm_type"] = abnormal_list ++ log_slow_win(msg, "driver slow") ++ xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) ++ return ++ ++ # kernel slow ++ kernel_stage = ['gettag', 'plug', 'deadline', 'hctx', 'requeue'] ++ for stage_name in kernel_stage: ++ abnormal, abnormal_list = is_abnormal((disk_name, stage_name, rw), io_data) ++ if not abnormal: ++ continue ++ msg["reason"] = "kernel slow" ++ msg["block_stack"] = f"bio,{stage_name}" ++ msg["alarm_type"] = abnormal_list ++ log_slow_win(msg, "kernel slow") ++ xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) ++ return ++ ++ log_slow_win(msg, "unknown") ++ xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) ++ ++ ++def check_disk_list_validation(disk_list): ++ valid_disk_list = [] ++ for disk_name in disk_list: ++ is_success, _ = check_result_validation(get_disk_type(disk_name), "") ++ if not is_success: ++ continue ++ valid_disk_list.append(disk_name) ++ return valid_disk_list ++ ++ ++def get_disk_type_by_name(disk_name): ++ logging.debug(f"send to sentryCollector get_disk_type: disk_name={disk_name}") ++ is_success, disk_type_str = check_result_validation(get_disk_type(disk_name), f'Invalid disk type {disk_name}') ++ if not is_success: ++ report_alarm_fail(f"{disk_type_str['msg']}") ++ try: ++ curr_disk_type = int(disk_type_str) ++ if curr_disk_type not in Disk_Type: ++ raise ValueError ++ except ValueError: ++ report_alarm_fail(f"Failed to get disk type for {disk_name}") ++ ++ return Disk_Type[curr_disk_type] +diff --git a/src/python/sentryPlugins/avg_block_io/stage_window.py b/src/python/sentryPlugins/avg_block_io/stage_window.py +new file mode 100644 +index 0000000..587bd49 +--- /dev/null ++++ b/src/python/sentryPlugins/avg_block_io/stage_window.py +@@ -0,0 +1,55 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++ ++class AbnormalWindowBase: ++ def __init__(self, window_size=10, window_threshold=7): ++ self.window_size = window_size ++ self.window_threshold = window_threshold ++ self.abnormal_window = [False] * window_size ++ self.window_data = [-1] * window_size ++ ++ def append_new_data(self, ab_res): ++ self.window_data.pop(0) ++ self.window_data.append(ab_res) ++ ++ def append_new_period(self, ab_res, avg_val=0): ++ self.abnormal_window.pop(0) ++ if self.is_abnormal_period(ab_res, avg_val): ++ self.abnormal_window.append(True) ++ else: ++ self.abnormal_window.append(False) ++ ++ def is_abnormal_window(self): ++ return sum(self.abnormal_window) >= self.window_threshold ++ ++ def window_data_to_string(self): ++ return ",".join(str(x) for x in self.window_data) ++ ++ ++class IoWindow(AbnormalWindowBase): ++ def __init__(self, window_size=10, window_threshold=7, abnormal_multiple=5, abnormal_multiple_lim=30, abnormal_time=40): ++ super().__init__(window_size, window_threshold) ++ self.abnormal_multiple = abnormal_multiple ++ self.abnormal_multiple_lim = abnormal_multiple_lim ++ self.abnormal_time = abnormal_time ++ ++ def is_abnormal_period(self, value, avg_val): ++ return (value > avg_val * self.abnormal_multiple and value > self.abnormal_multiple_lim) or \ ++ (value > self.abnormal_time) ++ ++ ++class IoDumpWindow(AbnormalWindowBase): ++ def __init__(self, window_size=10, window_threshold=7, abnormal_time=40): ++ super().__init__(window_size, window_threshold) ++ self.abnormal_time = abnormal_time ++ ++ def is_abnormal_period(self, value, avg_val=0): ++ return value > self.abnormal_time +diff --git a/src/python/sentryPlugins/avg_block_io/utils.py b/src/python/sentryPlugins/avg_block_io/utils.py +new file mode 100644 +index 0000000..1bfd4e8 +--- /dev/null ++++ b/src/python/sentryPlugins/avg_block_io/utils.py +@@ -0,0 +1,140 @@ ++# coding: utf-8 ++# Copyright (c) 2024 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++import os ++ ++AVG_VALUE = 0 ++AVG_COUNT = 1 ++ ++ ++def get_nested_value(data, keys): ++ """get data from nested dict""" ++ for key in keys: ++ if key in data: ++ data = data[key] ++ else: ++ return None ++ return data ++ ++ ++def set_nested_value(data, keys, value): ++ """set data to nested dict""" ++ for key in keys[:-1]: ++ if key in data: ++ data = data[key] ++ else: ++ return False ++ data[keys[-1]] = value ++ return True ++ ++ ++def get_win_data(disk_name, rw, io_data): ++ """get latency and iodump win data""" ++ latency = '' ++ iodump = '' ++ for stage_name in io_data[disk_name]: ++ if 'latency' in io_data[disk_name][stage_name][rw]: ++ latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() ++ latency += f'{stage_name}: [{latency_list}], ' ++ if 'iodump' in io_data[disk_name][stage_name][rw]: ++ iodump_list = io_data[disk_name][stage_name][rw]['iodump'].window_data_to_string() ++ iodump += f'{stage_name}: [{iodump_list}], ' ++ return {"latency": latency[:-2], "iodump": iodump[:-2]} ++ ++ ++def is_abnormal(io_key, io_data): ++ """check if latency and iodump win abnormal""" ++ abnormal_list = '' ++ for key in ['latency', 'iodump']: ++ all_keys = get_nested_value(io_data, io_key) ++ if all_keys and key in all_keys: ++ win = get_nested_value(io_data, io_key + (key,)) ++ if win and win.is_abnormal_window(): ++ abnormal_list += key + ', ' ++ if not abnormal_list: ++ return False, abnormal_list ++ return True, abnormal_list[:-2] ++ ++ ++def update_io_avg(old_avg, period_value, win_size): ++ """update average of latency window""" ++ if old_avg[AVG_COUNT] < win_size: ++ new_avg_count = old_avg[AVG_COUNT] + 1 ++ new_avg_value = (old_avg[AVG_VALUE] * old_avg[AVG_COUNT] + period_value[0]) / new_avg_count ++ else: ++ new_avg_count = old_avg[AVG_COUNT] ++ new_avg_value = (old_avg[AVG_VALUE] * (old_avg[AVG_COUNT] - 1) + period_value[0]) / new_avg_count ++ return [new_avg_value, new_avg_count] ++ ++ ++def update_io_period(old_avg, period_value, io_data, io_key): ++ """update period of latency and iodump window""" ++ all_wins = get_nested_value(io_data, io_key) ++ if all_wins and "latency" in all_wins: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_period(period_value[0], old_avg[AVG_VALUE]) ++ if all_wins and "iodump" in all_wins: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_period(period_value[1]) ++ ++ ++def update_io_data(period_value, io_data, io_key): ++ """update data of latency and iodump window""" ++ all_wins = get_nested_value(io_data, io_key) ++ if all_wins and "latency" in all_wins: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_data(period_value[0]) ++ if all_wins and "iodump" in all_wins: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_data(period_value[1]) ++ ++ ++def log_abnormal_period(old_avg, period_value, io_data, io_key): ++ """record log of abnormal period""" ++ all_wins = get_nested_value(io_data, io_key) ++ if all_wins and "latency" in all_wins: ++ if all_wins["latency"].is_abnormal_period(period_value[0], old_avg[AVG_VALUE]): ++ logging.info(f"[abnormal_period] disk: {io_key[0]}, stage: {io_key[1]}, iotype: {io_key[2]}, " ++ f"type: latency, avg: {round(old_avg[AVG_VALUE], 3)}, curr_val: {period_value[0]}") ++ if all_wins and "iodump" in all_wins: ++ if all_wins["iodump"].is_abnormal_period(period_value[1]): ++ logging.info(f"[abnormal_period] disk: {io_key[0]}, stage: {io_key[1]}, iotype: {io_key[2]}, " ++ f"type: iodump, curr_val: {period_value[1]}") ++ ++ ++def log_slow_win(msg, reason): ++ """record log of slow win""" ++ logging.warning(f"[SLOW IO] disk: {msg['driver_name']}, stage: {msg['block_stack']}, " ++ f"iotype: {msg['io_type']}, type: {msg['alarm_type']}, reason: {reason}") ++ logging.info(f"latency: {msg['details']['latency']}") ++ logging.info(f"iodump: {msg['details']['iodump']}") ++ ++ ++def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data): ++ """update avg and check abonrmal, return true if win_size full""" ++ period_value = get_nested_value(data, io_key) ++ old_avg = get_nested_value(io_avg_value, io_key) ++ ++ # 更新avg数据 ++ update_io_data(period_value, io_data, io_key) ++ if old_avg[AVG_COUNT] < win_size: ++ set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) ++ return False ++ ++ # 打印异常周期数据 ++ log_abnormal_period(old_avg, period_value, io_data, io_key) ++ ++ # 更新win数据 -- 判断异常周期 ++ update_io_period(old_avg, period_value, io_data, io_key) ++ all_wins = get_nested_value(io_data, io_key) ++ if not all_wins or 'latency' not in all_wins: ++ return True ++ period = get_nested_value(io_data, io_key + ("latency",)) ++ if period and period.is_abnormal_period(period_value[0], old_avg[AVG_VALUE]): ++ return True ++ set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) ++ return True +-- +2.43.0 diff --git a/sysSentry.spec b/sysSentry.spec index 634b3bf..1089f8d 100644 --- a/sysSentry.spec +++ b/sysSentry.spec @@ -4,7 +4,7 @@ Summary: System Inspection Framework Name: sysSentry Version: 1.0.2 -Release: 29 +Release: 30 License: Mulan PSL v2 Group: System Environment/Daemons Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz @@ -39,12 +39,14 @@ Patch26: hbm_online_repair-add-unload-driver.patch Patch27: add-pyxalarm-and-pySentryNotify-add-multi-users-supp.patch Patch28: adapt_5.10_kenel_for_syssentry.patch Patch29: collect-module-adapt-to-the-5.10-kernel.patch +Patch30: add-avg_block_io-and-ai_block_io.patch BuildRequires: cmake gcc-c++ BuildRequires: python3 python3-setuptools BuildRequires: json-c-devel BuildRequires: chrpath BuildRequires: elfutils-devel clang libbpf-devel bpftool +BuildRequires: python3-numpy python3-pytest Requires: libxalarm = %{version} Requires: libbpf @@ -68,6 +70,39 @@ Provides: libxalarm-devel = %{version} %description -n libxalarm-devel This package provides developer tools for the libxalarm. +%package -n avg_block_io +Summary: Supports slow I/O detection +Requires: sysSentry = %{version}-%{release} +Requires: pysentry_notify = %{version}-%{release} +Requires: pysentry_collect = %{version}-%{release} + +%description -n avg_block_io +This package provides Supports slow I/O detection based on EBPF + +%package -n ai_block_io +Summary: Supports slow I/O detection +Requires: python3-numpy +Requires: sysSentry = %{version}-%{release} +Requires: pysentry_notify = %{version}-%{release} +Requires: pysentry_collect = %{version}-%{release} + +%description -n ai_block_io +This package provides Supports slow I/O detection based on AI + +%package -n pyxalarm +Summary: Supports xalarm api in python immplementation +Requires: sysSentry = %{version}-%{release} + +%description -n pyxalarm +This package provides Supports xalarm api for users + +%package -n pysentry_notify +Summary: Supports xalarm report in python immplementation +Requires: sysSentry = %{version}-%{release} + +%description -n pysentry_notify +This package provides Supports xalarm report for plugins + %package -n cpu_sentry Summary: CPU fault inspection program Requires: procps-ng @@ -165,6 +200,14 @@ install src/c/hbm_online_repair/hbm_online_repair.env %{buildroot}/etc/sysconfig chrpath -d %{buildroot}%{_bindir}/cat-cli chrpath -d %{buildroot}%{_libdir}/libcpu_patrol.so +# avg_block_io +install config/tasks/avg_block_io.mod %{buildroot}/etc/sysSentry/tasks/ +install config/plugins/avg_block_io.ini %{buildroot}/etc/sysSentry/plugins/avg_block_io.ini + +# ai_block_io +install config/tasks/ai_block_io.mod %{buildroot}/etc/sysSentry/tasks/ +install config/plugins/ai_block_io.ini %{buildroot}/etc/sysSentry/plugins/ai_block_io.ini + # logrotate mkdir -p %{buildroot}%{_localstatedir}/lib/logrotate-syssentry mkdir -p %{buildroot}%{_sysconfdir}/cron.hourly @@ -173,6 +216,8 @@ install -m 0500 src/sh/logrotate-sysSentry.cron %{buildroot}%{_sysconfdir}/cron. pushd src/python python3 setup.py install -O1 --root=$RPM_BUILD_ROOT --record=SENTRY_FILES +cat SENTRY_FILES | grep -v register_xalarm.* | grep -v sentry_notify.* > SENTRY_FILES.tmp +mv SENTRY_FILES.tmp SENTRY_FILES popd %pre @@ -221,6 +266,18 @@ rm -rf %{buildroot} %attr(0600,root,root) %config(noreplace) %{_sysconfdir}/sysSentry/xalarm.conf %attr(0600,root,root) %{_unitdir}/xalarmd.service +# avg block io +%exclude %{_sysconfdir}/sysSentry/tasks/avg_block_io.mod +%exclude %{_sysconfdir}/sysSentry/plugins/avg_block_io.ini +%exclude %{_bindir}/avg_block_io +%exclude %{python3_sitelib}/sentryPlugins/* + +# ai_block_io +%exclude %{_sysconfdir}/sysSentry/tasks/ai_block_io.mod +%exclude %{_sysconfdir}/sysSentry/plugins/ai_block_io.ini +%exclude %{_bindir}/ai_block_io +%exclude %{python3_sitelib}/sentryPlugins/* + # sentryCollector %attr(0550,root,root) %{_bindir}/sentryCollector %attr(0600,root,root) %{_sysconfdir}/sysSentry/collector.conf @@ -248,6 +305,23 @@ rm -rf %{buildroot} %exclude %{python3_sitelib}/syssentry/bmc_* %exclude %{python3_sitelib}/syssentry/*/bmc_* +%files -n avg_block_io +%attr(0500,root,root) %{_bindir}/avg_block_io +%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/sysSentry/tasks/avg_block_io.mod +%attr(0600,root,root) %{_sysconfdir}/sysSentry/plugins/avg_block_io.ini +%attr(0550,root,root) %{python3_sitelib}/sentryPlugins/avg_block_io + +%files -n ai_block_io +%attr(0500,root,root) %{_bindir}/ai_block_io +%attr(0600,root,root) %config(noreplace) %{_sysconfdir}/sysSentry/tasks/ai_block_io.mod +%attr(0600,root,root) %{_sysconfdir}/sysSentry/plugins/ai_block_io.ini +%attr(0550,root,root) %{python3_sitelib}/sentryPlugins/ai_block_io + +# hbm repair module +%exclude %{_sysconfdir}/sysSentry/tasks/hbm_online_repair.mod +%exclude %{python3_sitelib}/syssentry/bmc_* +%exclude %{python3_sitelib}/syssentry/*/bmc_* + %files -n libxalarm %attr(0550,root,root) %{_libdir}/libxalarm.so @@ -256,6 +330,14 @@ rm -rf %{buildroot} %attr(0550,root,root) %{_includedir}/xalarm %attr(0550,root,root) %{_includedir}/xalarm/register_xalarm.h +%files -n pyxalarm +%attr(0550,root,root) %{python3_sitelib}/xalarm/register_xalarm.py +%attr(0550,root,root) %{python3_sitelib}/xalarm/__pycache__/register_xalarm* + +%files -n pysentry_notify +%attr(0550,root,root) %{python3_sitelib}/xalarm/sentry_notify.py +%attr(0550,root,root) %{python3_sitelib}/xalarm/__pycache__/sentry_notify* + %files -n cpu_sentry %attr(0500,root,root) %{_bindir}/cat-cli %attr(0500,root,root) %{_bindir}/cpu_sentry @@ -275,6 +357,12 @@ rm -rf %{buildroot} %attr(0550,root,root) %{python3_sitelib}/sentryCollector/__pycache__/collect_plugin* %changelog +* Sun Jan 26 2025 zhuofeng - 1.0.2-30 +- Type:bugfix +- CVE:NA +- SUG:NA +- DESC:add avg_block_io and ai_block_io + * Sun Jan 26 2025 zhuofeng - 1.0.2-29 - Type:bugfix - CVE:NA -- Gitee