From 72a0b186ac5f6ee57bc299bb2438d0c442fa9872 Mon Sep 17 00:00:00 2001 From: hewanhan Date: Mon, 3 Nov 2025 09:41:41 +0800 Subject: [PATCH] slow io plugin upgrade --- slow-io-plugin-upgrade.patch | 1816 ++++++++++++++++++++++++++++++++++ sysSentry.spec | 9 +- 2 files changed, 1824 insertions(+), 1 deletion(-) create mode 100644 slow-io-plugin-upgrade.patch diff --git a/slow-io-plugin-upgrade.patch b/slow-io-plugin-upgrade.patch new file mode 100644 index 0000000..3ff1ecb --- /dev/null +++ b/slow-io-plugin-upgrade.patch @@ -0,0 +1,1816 @@ +From 35cec134ee1cc8ab6d16550473de0d22a5c384b3 Mon Sep 17 00:00:00 2001 +From: hewanhan +Date: Fri, 31 Oct 2025 17:27:50 +0800 +Subject: [PATCH] slow io plugin upgrade + +--- + config/collector.conf | 3 + + config/plugins/ai_block_io.ini | 3 +- + config/plugins/avg_block_io.ini | 3 +- + .../pySentryCollector/collect_plugin.py | 40 ++-- + src/sentryPlugins/ai_block_io/ai_block_io.py | 54 +++-- + .../ai_block_io/config_parser.py | 33 ++- + src/sentryPlugins/ai_block_io/data_access.py | 47 ++++- + src/sentryPlugins/ai_block_io/detector.py | 71 ++++++- + src/sentryPlugins/ai_block_io/extra_logger.py | 186 ++++++++++++++++ + src/sentryPlugins/ai_block_io/io_data.py | 32 ++- + .../ai_block_io/sliding_window.py | 24 ++- + src/sentryPlugins/ai_block_io/utils.py | 13 +- + .../avg_block_io/avg_block_io.py | 30 ++- + src/sentryPlugins/avg_block_io/config.py | 30 ++- + .../avg_block_io/extra_logger.py | 199 ++++++++++++++++++ + src/sentryPlugins/avg_block_io/module_conn.py | 16 +- + .../avg_block_io/stage_window.py | 18 ++ + src/sentryPlugins/avg_block_io/utils.py | 29 ++- + .../sentryCollector/collect_config.py | 34 +++ + src/services/sentryCollector/collect_io.py | 86 +++++++- + .../sentryCollector/collect_server.py | 81 ++++--- + 21 files changed, 924 insertions(+), 108 deletions(-) + create mode 100644 src/sentryPlugins/ai_block_io/extra_logger.py + create mode 100644 src/sentryPlugins/avg_block_io/extra_logger.py + +diff --git a/config/collector.conf b/config/collector.conf +index 56b0ed1..8913530 100644 +--- a/config/collector.conf ++++ b/config/collector.conf +@@ -5,6 +5,9 @@ modules=io + period_time=1 + max_save=10 + disk=default ++nvme_ssd_threshold=1000 ++sata_ssd_threshold=1000 ++sata_hdd_threshold=1000 + + [log] + level=info +\ No newline at end of file +diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini +index 53ac486..ca926ac 100644 +--- a/config/plugins/ai_block_io.ini ++++ b/config/plugins/ai_block_io.ini +@@ -14,7 +14,8 @@ algorithm_type=boxplot + boxplot_parameter=1.5 + win_type=not_continuous + win_size=30 +-win_threshold=6 ++win_threshold_latency=6 ++win_threshold_iodump=3 + + [latency_sata_ssd] + read_avg_lim=10000 +diff --git a/config/plugins/avg_block_io.ini b/config/plugins/avg_block_io.ini +index 3b4ee33..52c2ab9 100644 +--- a/config/plugins/avg_block_io.ini ++++ b/config/plugins/avg_block_io.ini +@@ -9,7 +9,8 @@ period_time=1 + + [algorithm] + win_size=30 +-win_threshold=6 ++win_threshold_latency=6 ++win_threshold_iodump=3 + + [latency_nvme_ssd] + read_avg_lim=10000 +diff --git a/src/libsentry/python/pySentryCollector/collect_plugin.py b/src/libsentry/python/pySentryCollector/collect_plugin.py +index 3395f89..e1befe7 100644 +--- a/src/libsentry/python/pySentryCollector/collect_plugin.py ++++ b/src/libsentry/python/pySentryCollector/collect_plugin.py +@@ -52,6 +52,7 @@ LIMIT_MAX_SAVE_LEN = 300 + class ClientProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 ++ GET_IODUMP_DATA = 2 + PRO_END = 3 + + class ResultMessage(): +@@ -234,14 +235,8 @@ def inter_is_iocollect_valid(period, disk_list=None, stage=None): + result['message'] = result_message + return result + +-def get_io_data(period, disk_list, stage, iotype): +- result = inter_get_io_data(period, disk_list, stage, iotype) +- error_code = result['ret'] +- if error_code != ResultMessage.RESULT_SUCCEED: +- result['message'] = Result_Messages[error_code] +- return result + +-def inter_get_io_data(period, disk_list, stage, iotype): ++def inter_get_io_common(period, disk_list, stage, iotype, protocol): + result = {} + result['ret'] = ResultMessage.RESULT_UNKNOWN + result['message'] = "" +@@ -269,21 +264,21 @@ def inter_get_io_data(period, disk_list, stage, iotype): + return result + + req_msg_struct = { +- 'disk_list': json.dumps(disk_list), +- 'period': period, +- 'stage': json.dumps(stage), +- 'iotype' : json.dumps(iotype) +- } ++ 'disk_list': json.dumps(disk_list), ++ 'period': period, ++ 'stage': json.dumps(stage), ++ 'iotype': json.dumps(iotype) ++ } + + request_message = json.dumps(req_msg_struct) +- result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, ClientProtocol.GET_IO_DATA) ++ result_message = client_send_and_recv(request_message, CLT_MSG_LEN_LEN, protocol) + if not result_message: + logging.error("collect_plugin: client_send_and_recv failed") + return result + try: + json.loads(result_message) + except json.JSONDecodeError: +- logging.error("get_io_data: json decode error") ++ logging.error("get_io_common: json decode error") + result['ret'] = ResultMessage.RESULT_PARSE_FAILED + return result + +@@ -291,6 +286,23 @@ def inter_get_io_data(period, disk_list, stage, iotype): + result['message'] = result_message + return result + ++ ++def get_io_data(period, disk_list, stage, iotype): ++ result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_IO_DATA) ++ error_code = result['ret'] ++ if error_code != ResultMessage.RESULT_SUCCEED: ++ result['message'] = Result_Messages[error_code] ++ return result ++ ++ ++def get_iodump_data(period, disk_list, stage, iotype): ++ result = inter_get_io_common(period, disk_list, stage, iotype, ClientProtocol.GET_IODUMP_DATA) ++ error_code = result['ret'] ++ if error_code != ResultMessage.RESULT_SUCCEED: ++ result['message'] = Result_Messages[error_code] ++ return result ++ ++ + def get_disk_type(disk): + result = {} + result['ret'] = ResultMessage.RESULT_UNKNOWN +diff --git a/src/sentryPlugins/ai_block_io/ai_block_io.py b/src/sentryPlugins/ai_block_io/ai_block_io.py +index f7b8226..2973d52 100644 +--- a/src/sentryPlugins/ai_block_io/ai_block_io.py ++++ b/src/sentryPlugins/ai_block_io/ai_block_io.py +@@ -14,19 +14,21 @@ import signal + import logging + from collections import defaultdict + +-from .detector import Detector, DiskDetector ++from .detector import Detector, DiskDetector, DataDetector + from .threshold import ThresholdFactory, ThresholdType +-from .sliding_window import SlidingWindowFactory ++from .sliding_window import SlidingWindowFactory, DataWindow + from .utils import get_data_queue_size_and_update_size + from .config_parser import ConfigParser + from .data_access import ( + get_io_data_from_collect_plug, ++ get_iodump_data_from_collect_plug, + check_collect_valid, + get_disk_type, + check_disk_is_available + ) + from .io_data import MetricName + from .alarm_report import Xalarm, Report ++from .extra_logger import extra_slow_log + + CONFIG_FILE = "/etc/sysSentry/plugins/ai_block_io.ini" + +@@ -93,6 +95,8 @@ class SlowIODetection: + for iotype in iotypes: + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "latency")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) ++ self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iops")) ++ self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "iodump_data")) + + if not self._detector_name_list: + Report.report_pass("the disks to detection is empty, ai_block_io will exit.") +@@ -109,7 +113,7 @@ class SlowIODetection: + train_data_duration, train_update_duration, slow_io_detection_frequency + ) + sliding_window_type = self._config_parser.sliding_window_type +- window_size, window_threshold = ( ++ window_size, window_threshold_latency, window_threshold_iodump = ( + self._config_parser.get_window_size_and_window_minimum_threshold() + ) + +@@ -141,7 +145,7 @@ class SlowIODetection: + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, +- threshold=window_threshold, ++ threshold=window_threshold_latency, + abs_threshold=tot_lim, + avg_lim=avg_lim + ) +@@ -159,12 +163,27 @@ class SlowIODetection: + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, +- threshold=window_threshold ++ threshold=window_threshold_iodump + ) + detector = Detector(metric_name, threshold, sliding_window) + threshold.set_threshold(abs_threshold) + disk_detector.add_detector(detector) + ++ elif metric_name.metric_name == 'iops': ++ threshold = ThresholdFactory().get_threshold(ThresholdType.AbsoluteThreshold) ++ sliding_window = SlidingWindowFactory().get_sliding_window( ++ sliding_window_type, ++ queue_length=window_size, ++ threshold=window_threshold_latency ++ ) ++ detector = Detector(metric_name, threshold, sliding_window) ++ disk_detector.add_detector(detector) ++ ++ elif metric_name.metric_name == 'iodump_data': ++ data_window = DataWindow(window_size) ++ data_detector = DataDetector(metric_name, data_window) ++ disk_detector.add_data_detector(data_detector) ++ + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") + self._disk_detectors[disk] = disk_detector + +@@ -176,6 +195,9 @@ class SlowIODetection: + io_data_dict_with_disk_name = get_io_data_from_collect_plug( + self._config_parser.period_time, self._disk_list + ) ++ iodump_data_dict_with_disk_name = get_iodump_data_from_collect_plug( ++ self._config_parser.period_time, self._disk_list ++ ) + logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") + if io_data_dict_with_disk_name is None: + Report.report_pass( +@@ -187,9 +209,13 @@ class SlowIODetection: + logging.debug("step2. Start to detection slow io event.") + slow_io_event_list = [] + for disk, disk_detector in self._disk_detectors.items(): ++ disk_detector.push_data_to_data_detectors(iodump_data_dict_with_disk_name) + result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0]: ++ # 产生告警时获取iodump的详细数据 ++ result[6]["iodump_data"] = disk_detector.get_data_detector_list_window() + slow_io_event_list.append(result) ++ + logging.debug("step2. End to detection slow io event.") + + # Step3:慢IO事件上报 +@@ -204,17 +230,17 @@ class SlowIODetection: + "alarm_type": slow_io_event[5], + "details": slow_io_event[6] + } +- Xalarm.major(alarm_content) +- tmp_alarm_content = alarm_content.copy() +- del tmp_alarm_content["details"] +- logging.warning("[SLOW IO] " + str(tmp_alarm_content)) +- logging.warning(f'[SLOW IO] disk: {str(tmp_alarm_content.get("driver_name"))}, ' +- f'stage: {str(tmp_alarm_content.get("block_stack"))}, ' +- f'iotype: {str(tmp_alarm_content.get("io_type"))}, ' +- f'type: {str(tmp_alarm_content.get("alarm_type"))}, ' +- f'reason: {str(tmp_alarm_content.get("reason"))}') ++ logging.warning(f'[SLOW IO] disk: {str(alarm_content.get("driver_name"))}, ' ++ f'stage: {str(alarm_content.get("block_stack"))}, ' ++ f'iotype: {str(alarm_content.get("io_type"))}, ' ++ f'type: {str(alarm_content.get("alarm_type"))}, ' ++ f'reason: {str(alarm_content.get("reason"))}') + logging.warning(f"latency: " + str(alarm_content.get("details").get("latency"))) + logging.warning(f"iodump: " + str(alarm_content.get("details").get("iodump"))) ++ logging.warning(f"iops: " + str(alarm_content.get("details").get("iops"))) ++ extra_slow_log(alarm_content) ++ del alarm_content["details"]["iodump_data"] # 极端场景下iodump_data可能过大,导致发送失败,所以只在日志中打印,不发送到告警模块 ++ Xalarm.major(alarm_content) + + # Step4:等待检测时间 + logging.debug("step4. Wait to start next slow io event detection loop.") +diff --git a/src/sentryPlugins/ai_block_io/config_parser.py b/src/sentryPlugins/ai_block_io/config_parser.py +index 612fe9f..b457e14 100644 +--- a/src/sentryPlugins/ai_block_io/config_parser.py ++++ b/src/sentryPlugins/ai_block_io/config_parser.py +@@ -17,9 +17,11 @@ from .alarm_report import Report + from .threshold import ThresholdType + from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level + from .data_access import check_detect_frequency_is_valid ++from .extra_logger import init_extra_logger + + + LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" ++AI_EXTRA_LOG_PATH = "/var/log/sysSentry/ai_block_io_extra.log" + + ALL_STAGE_LIST = [ + "throtl", +@@ -52,6 +54,7 @@ def init_log_format(log_level: str): + logging.warning( + "the log_level: %s you set is invalid, use default value: info.", log_level + ) ++ init_extra_logger(AI_EXTRA_LOG_PATH, get_log_level(log_level.lower()), LOG_FORMAT) + + + class ConfigParser: +@@ -71,7 +74,8 @@ class ConfigParser: + "n_sigma_parameter": 3.0, + "win_type": get_sliding_window_type_enum("not_continuous"), + "win_size": 30, +- "win_threshold": 6, ++ "win_threshold_latency": 6, ++ "win_threshold_iodump": 3, + }, + "latency_sata_ssd": { + "read_avg_lim": 10000, +@@ -423,11 +427,11 @@ class ConfigParser: + ) + + def _read_window_minimum_threshold(self, items_sliding_window: dict): +- default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold"] +- self._conf["algorithm"]["win_threshold"] = ( ++ default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold_latency"] ++ self._conf["algorithm"]["win_threshold_latency"] = ( + self._get_config_value( + items_sliding_window, +- "win_threshold", ++ "win_threshold_latency", + int, + default_window_minimum_threshold, + gt=0, +@@ -495,6 +499,7 @@ class ConfigParser: + self._read_sliding_window_type(items_algorithm) + self._read_window_size(items_algorithm) + self._read_window_minimum_threshold(items_algorithm) ++ self._read_window_threshold_iodump(items_algorithm) + + if con.has_section("latency_sata_ssd"): + items_latency_sata_ssd = dict(con.items("latency_sata_ssd")) +@@ -702,7 +707,8 @@ class ConfigParser: + def get_window_size_and_window_minimum_threshold(self): + return ( + self._conf["algorithm"]["win_size"], +- self._conf["algorithm"]["win_threshold"], ++ self._conf["algorithm"]["win_threshold_latency"], ++ self._conf["algorithm"]["win_threshold_iodump"], + ) + + @property +@@ -731,7 +737,7 @@ class ConfigParser: + + @property + def window_minimum_threshold(self): +- return self._conf["algorithm"]["win_threshold"] ++ return self._conf["algorithm"]["win_threshold_latency"] + + @property + def absolute_threshold(self): +@@ -767,4 +773,17 @@ class ConfigParser: + + @property + def write_iodump_lim(self): +- return self._conf["iodump"]["write_iodump_lim"] +\ No newline at end of file ++ return self._conf["iodump"]["write_iodump_lim"] ++ ++ def _read_window_threshold_iodump(self, items_sliding_window: dict): ++ default_window_threshold_iodump = self.DEFAULT_CONF["algorithm"]["win_threshold_iodump"] ++ self._conf["algorithm"]["win_threshold_iodump"] = ( ++ self._get_config_value( ++ items_sliding_window, ++ "win_threshold_iodump", ++ int, ++ default_window_threshold_iodump, ++ gt=0, ++ le=self._conf["algorithm"]["win_size"], ++ ) ++ ) +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/data_access.py b/src/sentryPlugins/ai_block_io/data_access.py +index 2f2d607..f1c2bc2 100644 +--- a/src/sentryPlugins/ai_block_io/data_access.py ++++ b/src/sentryPlugins/ai_block_io/data_access.py +@@ -15,12 +15,13 @@ import logging + from sentryCollector.collect_plugin import ( + Result_Messages, + get_io_data, ++ get_iodump_data, + is_iocollect_valid, + get_disk_type + ) + + +-from .io_data import IOStageData, IOData ++from .io_data import IOStageData, IOData, IOStageDumpData, IODumpData + + COLLECT_STAGES = [ + "throtl", +@@ -33,6 +34,7 @@ COLLECT_STAGES = [ + "rq_driver", + "bio", + "iocost", ++ "deadline", + ] + + +@@ -125,3 +127,46 @@ def get_io_data_from_collect_plug(period, disk_list): + return ret + logging.warning(f'get io data failed with message: {data_raw["message"]}') + return None ++ ++ ++def _get_raw_iodump_data(period, disk_list): ++ return get_iodump_data( ++ period, ++ disk_list, ++ COLLECT_STAGES, ++ ["read", "write", "flush", "discard"], ++ ) ++ ++ ++def _get_iodump_stage_data(data): ++ io_stage_data = IOStageDumpData() ++ for data_type in ("read", "write", "flush", "discard"): ++ if data_type in data: ++ getattr(io_stage_data, data_type).iodump_data = data[data_type] ++ return io_stage_data ++ ++ ++def get_iodump_data_from_collect_plug(period, disk_list): ++ data_raw = _get_raw_iodump_data(period, disk_list) ++ if data_raw["ret"] == 0: ++ ret = {} ++ try: ++ data = json.loads(data_raw["message"]) ++ except json.decoder.JSONDecodeError as e: ++ logging.warning(f"get iodump data failed, {e}") ++ return None ++ ++ for disk in data: ++ disk_data = data[disk] ++ disk_ret = IODumpData() ++ for k, v in disk_data.items(): ++ try: ++ getattr(disk_ret, k) ++ setattr(disk_ret, k, _get_iodump_stage_data(v)) ++ except AttributeError: ++ logging.debug(f"no attr {k}") ++ continue ++ ret[disk] = disk_ret ++ return ret ++ logging.warning(f'get iodump data failed with message: {data_raw["message"]}') ++ return None +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/detector.py b/src/sentryPlugins/ai_block_io/detector.py +index 2688cb1..6c0a03f 100644 +--- a/src/sentryPlugins/ai_block_io/detector.py ++++ b/src/sentryPlugins/ai_block_io/detector.py +@@ -13,8 +13,8 @@ from datetime import datetime + + from .io_data import MetricName + from .threshold import Threshold +-from .sliding_window import SlidingWindow +-from .utils import get_metric_value_from_io_data_dict_by_metric_name ++from .sliding_window import SlidingWindow, DataWindow ++from .utils import get_metric_value_from_io_data_dict_by_metric_name, get_metric_value_from_iodump_data_dict + + + class Detector: +@@ -74,6 +74,35 @@ class Detector: + f' sliding_window_type: {self._slidingWindow}') + + ++class DataDetector: ++ ++ def __init__(self, metric_name: MetricName, data_window: DataWindow): ++ self._metric_name = metric_name ++ self._data_window = data_window ++ ++ def __repr__(self): ++ return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},' ++ f' io_type_name: {self._metric_name.io_access_type_name},' ++ f' metric_name: {self._metric_name.metric_name}') ++ ++ @property ++ def metric_name(self): ++ return self._metric_name ++ ++ def get_data_window_data(self): ++ return self._data_window.get_data() ++ ++ def push_data(self, iodump_data_dict_with_disk_name: dict): ++ logging.debug(f'enter Detector: {self}') ++ metric_value = get_metric_value_from_iodump_data_dict(iodump_data_dict_with_disk_name, self._metric_name) ++ if metric_value is None: ++ logging.debug('not found metric value, so return None.') ++ return False ++ logging.debug(f'input metric value: {str(metric_value)}') ++ self._data_window.push(metric_value) ++ return True ++ ++ + def set_to_str(parameter: set): + ret = "" + parameter = list(parameter) +@@ -91,19 +120,43 @@ class DiskDetector: + def __init__(self, disk_name: str): + self._disk_name = disk_name + self._detector_list = [] ++ self._data_detector_list = [] ++ ++ def __repr__(self): ++ msg = f'disk: {self._disk_name}, ' ++ for detector in self._detector_list: ++ msg += f'\n detector: [{detector}]' ++ for data_detector in self._data_detector_list: ++ msg += f'\n data_detector: [{data_detector}]' ++ return msg + + def add_detector(self, detector: Detector): + self._detector_list.append(detector) + ++ def add_data_detector(self, data_detector: DataDetector): ++ self._data_detector_list.append(data_detector) ++ + def get_detector_list_window(self): + latency_wins = {"read": {}, "write": {}} + iodump_wins = {"read": {}, "write": {}} ++ iops_wins = {"read": {}, "write": {}} + for detector in self._detector_list: + if detector.metric_name.metric_name == 'latency': + latency_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() + elif detector.metric_name.metric_name == 'io_dump': + iodump_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() +- return latency_wins, iodump_wins ++ elif detector.metric_name.metric_name == 'iops': ++ iops_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] =\ ++ detector.get_sliding_window_data() ++ return latency_wins, iodump_wins, iops_wins ++ ++ def get_data_detector_list_window(self): ++ iodump_data_wins = {"read": {}, "write": {}} ++ for data_detector in self._data_detector_list: ++ if data_detector.metric_name.metric_name == 'iodump_data': ++ iodump_data_wins[data_detector.metric_name.io_access_type_name][data_detector.metric_name.stage_name] =\ ++ data_detector.get_data_window_data() ++ return iodump_data_wins + + def is_slow_io_event(self, io_data_dict_with_disk_name: dict): + diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} +@@ -134,8 +187,8 @@ class DiskDetector: + io_type.add(metric_name.io_access_type_name) + alarm_type.add(metric_name.metric_name) + +- latency_wins, iodump_wins = self.get_detector_list_window() +- details = {"latency": latency_wins, "iodump": iodump_wins} ++ latency_wins, iodump_wins, iops_wins = self.get_detector_list_window() ++ details = {"latency": latency_wins, "iodump": iodump_wins, "iops": iops_wins} + + io_press = {"throtl", "wbt", "iocost", "bfq"} + driver_slow = {"rq_driver"} +@@ -150,8 +203,6 @@ class DiskDetector: + + return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details + +- def __repr__(self): +- msg = f'disk: {self._disk_name}, ' +- for detector in self._detector_list: +- msg += f'\n detector: [{detector}]' +- return msg ++ def push_data_to_data_detectors(self, iodump_data_dict_with_disk_name: dict): ++ for data_detector in self._data_detector_list: ++ data_detector.push_data(iodump_data_dict_with_disk_name) +diff --git a/src/sentryPlugins/ai_block_io/extra_logger.py b/src/sentryPlugins/ai_block_io/extra_logger.py +new file mode 100644 +index 0000000..cfd1929 +--- /dev/null ++++ b/src/sentryPlugins/ai_block_io/extra_logger.py +@@ -0,0 +1,186 @@ ++# coding: utf-8 ++# Copyright (c) 2025 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++import os ++import re ++ ++extra_logger = None ++ ++ ++# Define stage groups ++STAGE_GROUPS = { ++ 'B->Q': ['throtl', 'wbt', 'iocost'], ++ 'Q->G': ['gettag'], ++ 'G->I': ['plug'], ++ 'I->D': ['deadline', 'bfq', 'hctx', 'requeue'], ++ 'D->C': ['rq_driver'] ++} ++ ++ ++def init_extra_logger(log_path, log_level, log_format): ++ global extra_logger ++ try: ++ if not os.path.exists(log_path): ++ fd = os.open(log_path, os.O_CREAT | os.O_WRONLY, 0o600) ++ os.close(fd) ++ logger_name = f"extra_logger_{log_path}" ++ logger = logging.getLogger(logger_name) ++ logger.propagate = False ++ logger.setLevel(log_level) ++ ++ file_handler = logging.FileHandler(log_path) ++ file_handler.setLevel(log_level) ++ ++ formatter = logging.Formatter(log_format) ++ file_handler.setFormatter(formatter) ++ ++ logger.addHandler(file_handler) ++ extra_logger = logger ++ except Exception as e: ++ logging.error(f"Failed to create extra logger for {log_path}: {e}") ++ extra_logger = logging.getLogger() # Fallback to default logger ++ ++ ++def extra_slow_log(msg): ++ if "latency" in str(msg.get('alarm_type', '')): ++ extra_latency_log(msg) ++ if "io_dump" in str(msg.get('alarm_type', '')): ++ extra_iodump_log(msg) ++ ++ ++def extra_latency_log(msg): ++ io_types = [iot.strip() for iot in re.split(r',+', msg['io_type'])] ++ ++ # Calculate iops average ++ for io_type in io_types: ++ iops_avg = 0 ++ iops_data_dict = msg['details']['iops'].get(io_type, {}) ++ if 'rq_driver' in iops_data_dict: ++ iops_avg = sum(iops_data_dict['rq_driver']) / len(iops_data_dict['rq_driver']) ++ ++ extra_logger.warning(f"[SLOW IO] latency, disk:{msg['driver_name']}, iotype:{io_type}, iops:{int(iops_avg)}") ++ ++ # Calculate statistics for each group ++ latency_data_dict = msg['details']['latency'].get(io_type, {}) ++ group_stats = {} ++ for group_name, stages in STAGE_GROUPS.items(): ++ all_values = [] ++ for stage in stages: ++ if stage in latency_data_dict: ++ all_values.extend(latency_data_dict[stage]) ++ if all_values: ++ min_val = min(all_values) ++ max_val = max(all_values) ++ avg_val = sum(all_values) / len(all_values) ++ else: ++ min_val = 0 ++ max_val = 0 ++ avg_val = 0 ++ # Convert to ms ++ min_val_ms = min_val / 1000.0 ++ max_val_ms = max_val / 1000.0 ++ avg_val_ms = avg_val / 1000.0 ++ group_stats[group_name] = { ++ 'min': min_val_ms, ++ 'max': max_val_ms, ++ 'avg': avg_val_ms ++ } ++ ++ # Calculate total latency (B->C) ++ total_avg = 0 ++ total_min = 0 ++ total_max = 0 ++ for group_name in STAGE_GROUPS: ++ total_avg += group_stats[group_name]['avg'] ++ total_min += group_stats[group_name]['min'] ++ total_max += group_stats[group_name]['max'] ++ group_stats['B->C'] = { ++ 'min': total_min, ++ 'max': total_max, ++ 'avg': total_avg ++ } ++ ++ # Calculate PCT for each group (except B->C) ++ for group_name in STAGE_GROUPS: ++ if total_avg > 0: ++ pct = (group_stats[group_name]['avg'] / total_avg) * 100 ++ else: ++ pct = 0 ++ group_stats[group_name]['pct'] = pct ++ group_stats['B->C']['pct'] = 100.0 ++ ++ # Output table ++ stage_order = ['B->Q', 'Q->G', 'G->I', 'I->D', 'D->C', 'B->C'] ++ stage_width = 7 ++ num_width = 12 ++ pct_width = 8 ++ ++ extra_logger.warning( ++ f"{'Stage':<{stage_width}} " ++ f"{'Min(ms)':>{num_width}} " ++ f"{'Max(ms)':>{num_width}} " ++ f"{'Avg(ms)':>{num_width}} " ++ f"{'PCT':>{pct_width}}" ++ ) ++ ++ for stage in stage_order: ++ try: ++ s = group_stats[stage] ++ min_str = f"{s['min']:>.3f}" ++ max_str = f"{s['max']:>.3f}" ++ avg_str = f"{s['avg']:>.3f}" ++ pct_str = f"{s['pct']:.2f}%" ++ ++ extra_logger.warning( ++ f"{stage:<{stage_width}} " ++ f"{min_str:>{num_width}} " ++ f"{max_str:>{num_width}} " ++ f"{avg_str:>{num_width}} " ++ f"{pct_str:>{pct_width}}" ++ ) ++ except KeyError: ++ return ++ ++ ++def extra_iodump_log(msg): ++ io_types = [iot.strip() for iot in re.split(r',+', msg['io_type'])] ++ ++ for io_type in io_types: ++ extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{io_type}") ++ iodump_data = msg['details']['iodump_data'].get(io_type, {}) ++ ++ try: ++ bio_data = iodump_data['bio'] ++ except Exception as e: ++ extra_logger.error(f"Failed to parse iodump data: {e}") ++ return ++ ++ stack_to_stage = {} ++ for stage, stacks in STAGE_GROUPS.items(): ++ for stack in stacks: ++ stack_to_stage[stack] = stage ++ ++ last_bio_record = {} ++ for window in bio_data: ++ for entry in window: ++ parts = entry.split(',') ++ task_name, pid, io_stack, bio_ptr, start_ago = parts ++ if io_stack in stack_to_stage: ++ stage = stack_to_stage[io_stack] ++ last_bio_record[bio_ptr] = (task_name, pid, io_stack, stage, bio_ptr, start_ago) ++ ++ header = f"{'TASK_NAME':<18} {'PID':>8} {'IO_STACK':<12} {'STAGE':<8} {'BIO_PTR':<20} {'START_AGO(ms)':>10}" ++ extra_logger.warning(header) ++ ++ for bio_ptr in last_bio_record: ++ task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] ++ line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" ++ extra_logger.warning(line) +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/io_data.py b/src/sentryPlugins/ai_block_io/io_data.py +index 6042911..023e7b1 100644 +--- a/src/sentryPlugins/ai_block_io/io_data.py ++++ b/src/sentryPlugins/ai_block_io/io_data.py +@@ -11,7 +11,7 @@ + + from dataclasses import dataclass, field + from datetime import datetime +-from typing import Optional ++from typing import Optional, List + + + @dataclass +@@ -42,6 +42,36 @@ class IOData: + requeue: IOStageData = field(default_factory=lambda: IOStageData()) + rq_driver: IOStageData = field(default_factory=lambda: IOStageData()) + bio: IOStageData = field(default_factory=lambda: IOStageData()) ++ deadline: IOStageData = field(default_factory=lambda: IOStageData()) ++ time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) ++ ++ ++@dataclass ++class IoDumpListData: ++ iodump_data: List[str] = field(default_factory=list) ++ ++ ++@dataclass ++class IOStageDumpData: ++ read: IoDumpListData = field(default_factory=lambda: IoDumpListData()) ++ write: IoDumpListData = field(default_factory=lambda: IoDumpListData()) ++ flush: IoDumpListData = field(default_factory=lambda: IoDumpListData()) ++ discard: IoDumpListData = field(default_factory=lambda: IoDumpListData()) ++ ++ ++@dataclass ++class IODumpData: ++ throtl: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ wbt: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ gettag: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ iocost: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ plug: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ bfq: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ hctx: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ requeue: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ rq_driver: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ bio: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) ++ deadline: IOStageDumpData = field(default_factory=lambda: IOStageDumpData()) + time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + +diff --git a/src/sentryPlugins/ai_block_io/sliding_window.py b/src/sentryPlugins/ai_block_io/sliding_window.py +index b174d94..6881baa 100644 +--- a/src/sentryPlugins/ai_block_io/sliding_window.py ++++ b/src/sentryPlugins/ai_block_io/sliding_window.py +@@ -10,6 +10,7 @@ + # See the Mulan PSL v2 for more details. + + from enum import Enum, unique ++from typing import Any + import numpy as np + + +@@ -33,10 +34,8 @@ class SlidingWindow: + def is_abnormal(self, data): + if self._avg_lim is not None and data < self._avg_lim: + return False +- if self._avg_lim is not None and self._ai_threshold is not None: +- threshold = max(self._avg_lim, self._ai_threshold) +- if data > threshold: +- return True ++ if self._ai_threshold is not None and data > self._ai_threshold: ++ return True + if self._abs_threshold is not None and data > self._abs_threshold: + return True + return False +@@ -130,3 +129,20 @@ class SlidingWindowFactory: + return MedianSlidingWindow(*args, **kwargs) + else: + return NotContinuousSlidingWindow(*args, **kwargs) ++ ++ ++class DataWindow: ++ def __init__(self, window_size: int): ++ self._window_size = window_size ++ self._data_queue = [] ++ ++ def __repr__(self): ++ return f"[SingleDataWindow, window size: {self._window_size}]" ++ ++ def push(self, data: Any): ++ if len(self._data_queue) == self._window_size: ++ self._data_queue.pop(0) ++ self._data_queue.append(data) ++ ++ def get_data(self): ++ return self._data_queue +\ No newline at end of file +diff --git a/src/sentryPlugins/ai_block_io/utils.py b/src/sentryPlugins/ai_block_io/utils.py +index 7d2390b..919cf9b 100644 +--- a/src/sentryPlugins/ai_block_io/utils.py ++++ b/src/sentryPlugins/ai_block_io/utils.py +@@ -15,7 +15,7 @@ from dataclasses import asdict + + from .threshold import ThresholdType + from .sliding_window import SlidingWindowType +-from .io_data import MetricName, IOData ++from .io_data import MetricName, IOData, IODumpData + + + def get_threshold_type_enum(algorithm_type: str): +@@ -49,6 +49,17 @@ def get_metric_value_from_io_data_dict_by_metric_name( + return None + + ++def get_metric_value_from_iodump_data_dict(io_dump_data_dict: dict, metric_name: MetricName): ++ try: ++ io_dump_data: IODumpData = io_dump_data_dict[metric_name.disk_name] ++ io_dump_stage_data = asdict(io_dump_data)[metric_name.stage_name] ++ base_data = io_dump_stage_data[metric_name.io_access_type_name] ++ metric_value = base_data[metric_name.metric_name] ++ return metric_value ++ except KeyError: ++ return None ++ ++ + def get_data_queue_size_and_update_size( + training_data_duration: float, + train_update_duration: float, +diff --git a/src/sentryPlugins/avg_block_io/avg_block_io.py b/src/sentryPlugins/avg_block_io/avg_block_io.py +index 899d517..ef19b7b 100644 +--- a/src/sentryPlugins/avg_block_io/avg_block_io.py ++++ b/src/sentryPlugins/avg_block_io/avg_block_io.py +@@ -14,11 +14,13 @@ import configparser + import time + + from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage +-from .stage_window import IoWindow, IoDumpWindow +-from .module_conn import avg_is_iocollect_valid, avg_get_io_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation +-from .utils import update_avg_and_check_abnormal ++from .stage_window import IoWindow, IoDumpWindow,IopsWindow,IodumpMsgWindow ++from .module_conn import avg_is_iocollect_valid, avg_get_io_data, avg_get_iodump_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation ++from .utils import update_avg_and_check_abnormal, update_avg_iodump_data ++from .extra_logger import init_extra_logger + + CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" ++AVG_EXTRA_LOG_PATH = "/var/log/sysSentry/avg_block_io_extra.log" + + + def init_io_win(io_dic, config, common_param): +@@ -52,12 +54,23 @@ def init_io_win(io_dic, config, common_param): + iodump_lim_value = curr_stage_param.get(iodump_lim_key, common_param.get("iodump", {}).get(iodump_lim_key)) + + if avg_lim_value and avg_time_value and tot_lim_value: +- io_data[disk_name][stage_name][rw]["latency"] = IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, abnormal_time=tot_lim_value) ++ io_data[disk_name][stage_name][rw]["latency"] = \ ++ IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold_latency"], \ ++ abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, \ ++ abnormal_time=tot_lim_value) + logging.debug("Successfully create {}-{}-{}-latency window".format(disk_name, stage_name, rw)) + + if iodump_lim_value is not None: +- io_data[disk_name][stage_name][rw]["iodump"] = IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_time=iodump_lim_value) ++ io_data[disk_name][stage_name][rw]["iodump"] =\ ++ IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold_iodump"],\ ++ abnormal_time=iodump_lim_value) + logging.debug("Successfully create {}-{}-{}-iodump window".format(disk_name, stage_name, rw)) ++ ++ io_data[disk_name][stage_name][rw]["iops"] = IopsWindow(window_size=io_dic["win_size"]) ++ logging.debug("Successfully create {}-{}-{}-iops window".format(disk_name, stage_name, rw)) ++ ++ io_data[disk_name][stage_name][rw]["iodump_data"] = IodumpMsgWindow(window_size=io_dic["win_size"]) ++ logging.debug("Successfully create {}-{}-{}-iodump_data window".format(disk_name, stage_name, rw)) + return io_data, io_avg_value + + +@@ -124,6 +137,9 @@ def main_loop(io_dic, io_data, io_avg_value): + logging.error(f"{curr_period_data['msg']}") + continue + ++ # 获取iodump的详细信息 ++ is_success, iodump_data = avg_get_iodump_data(io_dic) ++ + # 处理周期数据 + reach_size = False + for disk_name in disk_list: +@@ -132,6 +148,7 @@ def main_loop(io_dic, io_data, io_avg_value): + if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: + io_key = (disk_name, stage_name, rw) + reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) ++ update_avg_iodump_data(iodump_data, is_success, io_key, io_data) + + # win_size不满时不进行告警判断 + if not reach_size: +@@ -152,6 +169,7 @@ def main(): + log_level = read_config_log(CONFIG_FILE) + log_format = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" + logging.basicConfig(level=log_level, format=log_format) ++ init_extra_logger(AVG_EXTRA_LOG_PATH, log_level, log_format) + + # 初始化配置读取 + config = configparser.ConfigParser(comment_prefixes=('#', ';')) +@@ -175,7 +193,7 @@ def main(): + + # 初始化窗口 -- config读取,对应is_iocollect_valid返回的结果 + # step1. 解析公共配置 --- algorithm +- io_dic["win_size"], io_dic["win_threshold"] = read_config_algorithm(config) ++ io_dic["win_size"], io_dic["win_threshold_latency"], io_dic["win_threshold_iodump"] = read_config_algorithm(config) + + # step2. 解析公共配置 --- latency_xxx + common_param = read_config_latency(config) +diff --git a/src/sentryPlugins/avg_block_io/config.py b/src/sentryPlugins/avg_block_io/config.py +index c1e8ab1..79bd21a 100644 +--- a/src/sentryPlugins/avg_block_io/config.py ++++ b/src/sentryPlugins/avg_block_io/config.py +@@ -24,7 +24,8 @@ CONF_COMMON_PER_TIME = 'period_time' + + CONF_ALGO = 'algorithm' + CONF_ALGO_SIZE = 'win_size' +-CONF_ALGO_THRE = 'win_threshold' ++CONF_ALGO_THRE_LATENCY = 'win_threshold_latency' ++CONF_ALGO_THRE_IODUMP = 'win_threshold_iodump' + + CONF_LATENCY = 'latency_{}' + CONF_IODUMP = 'iodump' +@@ -40,7 +41,8 @@ DEFAULT_PARAM = { + CONF_COMMON_PER_TIME: 1 + }, CONF_ALGO: { + CONF_ALGO_SIZE: 30, +- CONF_ALGO_THRE: 6 ++ CONF_ALGO_THRE_LATENCY: 6, ++ CONF_ALGO_THRE_IODUMP: 3 + }, 'latency_nvme_ssd': { + 'read_avg_lim': 10000, + 'write_avg_lim': 10000, +@@ -162,16 +164,26 @@ def read_config_algorithm(config): + logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_SIZE}, use {win_size} as default") + + try: +- win_threshold = int(config.get(CONF_ALGO, CONF_ALGO_THRE)) +- if win_threshold < 1 or win_threshold > 300 or win_threshold > win_size: +- raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE}") ++ win_threshold_latency = int(config.get(CONF_ALGO, CONF_ALGO_THRE_LATENCY)) ++ if win_threshold_latency < 1 or win_threshold_latency > 300 or win_threshold_latency > win_size: ++ raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY}") + except ValueError: +- report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE} config") ++ report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY} config") + except configparser.NoOptionError: +- win_threshold = DEFAULT_PARAM[CONF_ALGO]['win_threshold'] +- logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE}, use {win_threshold} as default") ++ win_threshold_latency = DEFAULT_PARAM[CONF_ALGO]['win_threshold_latency'] ++ logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE_LATENCY}, use {win_threshold_latency} as default") + +- return win_size, win_threshold ++ try: ++ win_threshold_iodump = int(config.get(CONF_ALGO, CONF_ALGO_THRE_IODUMP)) ++ if win_threshold_iodump < 1 or win_threshold_iodump > 300 or win_threshold_iodump > win_size: ++ raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP}") ++ except ValueError: ++ report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP} config") ++ except configparser.NoOptionError: ++ win_threshold_iodump = DEFAULT_PARAM[CONF_ALGO][CONF_ALGO_THRE_IODUMP] ++ logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE_IODUMP}, use {win_threshold_iodump} as default") ++ ++ return win_size, win_threshold_latency, win_threshold_iodump + + + def read_config_latency(config): +diff --git a/src/sentryPlugins/avg_block_io/extra_logger.py b/src/sentryPlugins/avg_block_io/extra_logger.py +new file mode 100644 +index 0000000..ac86306 +--- /dev/null ++++ b/src/sentryPlugins/avg_block_io/extra_logger.py +@@ -0,0 +1,199 @@ ++# coding: utf-8 ++# Copyright (c) 2025 Huawei Technologies Co., Ltd. ++# sysSentry is licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++import logging ++import os ++import re ++import ast ++ ++extra_logger = None ++ ++ ++# Define stage groups ++STAGE_GROUPS = { ++ 'B->Q': ['throtl', 'wbt', 'iocost'], ++ 'Q->G': ['gettag'], ++ 'G->I': ['plug'], ++ 'I->D': ['deadline', 'bfq', 'hctx', 'requeue'], ++ 'D->C': ['rq_driver'] ++} ++ ++PATTERN = re.compile(r'(\w+):\s*\[([0-9.,]+)\]') ++ ++ ++def init_extra_logger(log_path, log_level, log_format): ++ global extra_logger ++ try: ++ if not os.path.exists(log_path): ++ fd = os.open(log_path, os.O_CREAT | os.O_WRONLY, 0o600) ++ os.close(fd) ++ logger_name = f"extra_logger_{log_path}" ++ logger = logging.getLogger(logger_name) ++ logger.propagate = False ++ logger.setLevel(log_level) ++ ++ file_handler = logging.FileHandler(log_path) ++ file_handler.setLevel(log_level) ++ ++ formatter = logging.Formatter(log_format) ++ file_handler.setFormatter(formatter) ++ ++ logger.addHandler(file_handler) ++ extra_logger = logger ++ except Exception as e: ++ logging.error(f"Failed to create extra logger for {log_path}: {e}") ++ extra_logger = logging.getLogger() # Fallback to default logger ++ ++ ++def extra_slow_log(msg): ++ if "latency" in str(msg.get('alarm_type', '')): ++ extra_latency_log(msg) ++ if "iodump" in str(msg.get('alarm_type', '')): ++ extra_iodump_log(msg) ++ ++ ++def extra_latency_log(msg): ++ # Parse the iops string from msg ++ iops_avg = 0 ++ iops_str = msg['details']['iops'] ++ iops_matches = re.findall(PATTERN, iops_str) ++ iops_data = {} ++ for match in iops_matches: ++ key = match[0] ++ values = list(map(float, match[1].split(','))) ++ iops_data[key] = values ++ if 'rq_driver' in iops_data and iops_data['rq_driver']: ++ iops_avg = sum(iops_data['rq_driver']) / len(iops_data['rq_driver']) ++ ++ extra_logger.warning(f"[SLOW IO] alarm_type: latency, disk: {msg['driver_name']}, " ++ f"iotype: {msg['io_type']}, iops: {int(iops_avg)}") ++ ++ # Parse the latency string from msg ++ latency_str = msg['details']['latency'] ++ latency_matches = re.findall(PATTERN, latency_str) ++ latency_data = {} ++ for match in latency_matches: ++ key = match[0] ++ values = list(map(float, match[1].split(','))) ++ latency_data[key] = values ++ ++ # Calculate statistics for each group ++ group_stats = {} ++ for group_name, stages in STAGE_GROUPS.items(): ++ all_values = [] ++ for stage in stages: ++ if stage in latency_data: ++ all_values.extend(latency_data[stage]) ++ if all_values: ++ min_val = min(all_values) ++ max_val = max(all_values) ++ avg_val = sum(all_values) / len(all_values) ++ else: ++ min_val = 0 ++ max_val = 0 ++ avg_val = 0 ++ # Convert to ms ++ min_val_ms = min_val / 1000.0 ++ max_val_ms = max_val / 1000.0 ++ avg_val_ms = avg_val / 1000.0 ++ group_stats[group_name] = { ++ 'min': min_val_ms, ++ 'max': max_val_ms, ++ 'avg': avg_val_ms ++ } ++ ++ # Calculate total latency (B->C) ++ total_avg = 0 ++ total_min = 0 ++ total_max = 0 ++ for group_name in STAGE_GROUPS: ++ total_avg += group_stats[group_name]['avg'] ++ total_min += group_stats[group_name]['min'] ++ total_max += group_stats[group_name]['max'] ++ group_stats['B->C'] = { ++ 'min': total_min, ++ 'max': total_max, ++ 'avg': total_avg ++ } ++ ++ # Calculate PCT for each group (except B->C) ++ for group_name in STAGE_GROUPS: ++ if total_avg > 0: ++ pct = (group_stats[group_name]['avg'] / total_avg) * 100 ++ else: ++ pct = 0 ++ group_stats[group_name]['pct'] = pct ++ group_stats['B->C']['pct'] = 100.0 ++ ++ # Output table ++ stage_order = ['B->Q', 'Q->G', 'G->I', 'I->D', 'D->C', 'B->C'] ++ stage_width = 7 ++ num_width = 12 ++ pct_width = 8 ++ ++ extra_logger.warning( ++ f"{'Stage':<{stage_width}} " ++ f"{'Min(ms)':>{num_width}} " ++ f"{'Max(ms)':>{num_width}} " ++ f"{'Avg(ms)':>{num_width}} " ++ f"{'PCT':>{pct_width}}" ++ ) ++ ++ for stage in stage_order: ++ try: ++ s = group_stats[stage] ++ min_str = f"{s['min']:>.3f}" ++ max_str = f"{s['max']:>.3f}" ++ avg_str = f"{s['avg']:>.3f}" ++ pct_str = f"{s['pct']:.2f}%" ++ ++ extra_logger.warning( ++ f"{stage:<{stage_width}} " ++ f"{min_str:>{num_width}} " ++ f"{max_str:>{num_width}} " ++ f"{avg_str:>{num_width}} " ++ f"{pct_str:>{pct_width}}" ++ ) ++ except KeyError: ++ return ++ ++ ++def extra_iodump_log(msg): ++ extra_logger.warning(f"[SLOW IO] iodump, disk:{msg['driver_name']}, iotype:{msg['io_type']}") ++ iodump_str = msg['details']['iodump_data'] ++ ++ try: ++ iodump_data = ast.literal_eval(iodump_str) ++ bio_data = iodump_data['bio'] ++ except Exception as e: ++ extra_logger.error(f"Failed to parse iodump data: {e}") ++ return ++ ++ stack_to_stage = {} ++ for stage, stacks in STAGE_GROUPS.items(): ++ for stack in stacks: ++ stack_to_stage[stack] = stage ++ ++ last_bio_record = {} ++ for window in bio_data: ++ for entry in window: ++ parts = entry.split(',') ++ task_name, pid, io_stack, bio_ptr, start_ago = parts ++ if io_stack in stack_to_stage: ++ stage = stack_to_stage[io_stack] ++ last_bio_record[bio_ptr] = (task_name, pid, io_stack, stage, bio_ptr, start_ago) ++ ++ header = f"{'TASK_NAME':<18} {'PID':>8} {'IO_STACK':<12} {'STAGE':<8} {'BIO_PTR':<20} {'START_AGO(ms)':>10}" ++ extra_logger.warning(header) ++ ++ for bio_ptr in last_bio_record: ++ task_name, pid, io_stack, stage, bio_ptr, start_ago = last_bio_record[bio_ptr] ++ line = f"{task_name:<18} {pid:>8} {io_stack:<12} {stage:<8} {bio_ptr:<20} {start_ago:>10}" ++ extra_logger.warning(line) +\ No newline at end of file +diff --git a/src/sentryPlugins/avg_block_io/module_conn.py b/src/sentryPlugins/avg_block_io/module_conn.py +index bc10802..7e9b065 100644 +--- a/src/sentryPlugins/avg_block_io/module_conn.py ++++ b/src/sentryPlugins/avg_block_io/module_conn.py +@@ -12,10 +12,10 @@ import json + import logging + import sys + +-from .utils import is_abnormal, get_win_data, log_slow_win +-from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, Result_Messages, get_disk_type, Disk_Type ++from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, get_iodump_data, Result_Messages, get_disk_type, Disk_Type + from syssentry.result import ResultLevel, report_result + from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR ++from .utils import is_abnormal, get_win_data, log_slow_win + + + TASK_NAME = "avg_block_io" +@@ -34,6 +34,14 @@ def avg_get_io_data(io_dic): + return check_result_validation(res, 'get io data') + + ++def avg_get_iodump_data(io_dic): ++ """avg_get_iodump_data from sentryCollector""" ++ logging.debug(f"send to sentryCollector avg_get_iodump_data: period={io_dic['period_time']}, " ++ f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") ++ res = get_iodump_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) ++ return check_result_validation(res, 'get io dump data') ++ ++ + def avg_is_iocollect_valid(io_dic, config_disk, config_stage): + """is_iocollect_valid from sentryCollector""" + logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " +@@ -89,6 +97,7 @@ def process_report_data(disk_name, rw, io_data): + msg["block_stack"] = f"bio,{stage_name}" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "IO press") ++ del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + +@@ -99,6 +108,7 @@ def process_report_data(disk_name, rw, io_data): + msg["block_stack"] = "bio,rq_driver" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "driver slow") ++ del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + +@@ -112,10 +122,12 @@ def process_report_data(disk_name, rw, io_data): + msg["block_stack"] = f"bio,{stage_name}" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "kernel slow") ++ del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + + log_slow_win(msg, "unknown") ++ del msg["details"]["iodump_data"] # ˳iodump_dataܹ,·ʧ,ֻ־дӡ,͵澯ģ + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + + +diff --git a/src/sentryPlugins/avg_block_io/stage_window.py b/src/sentryPlugins/avg_block_io/stage_window.py +index 587bd49..29fa6e1 100644 +--- a/src/sentryPlugins/avg_block_io/stage_window.py ++++ b/src/sentryPlugins/avg_block_io/stage_window.py +@@ -53,3 +53,21 @@ class IoDumpWindow(AbnormalWindowBase): + + def is_abnormal_period(self, value, avg_val=0): + return value > self.abnormal_time ++ ++ ++class IopsWindow(AbnormalWindowBase): ++ def is_abnormal_period(self, value, avg_val=10): ++ return False ++ ++ ++class IodumpMsgWindow: ++ def __init__(self, window_size=10): ++ self.window_size = window_size ++ self.window_data = [[] for _ in range(window_size)] ++ ++ def append_new_data(self, msg): ++ self.window_data.pop(0) ++ self.window_data.append(msg) ++ ++ def window_data_to_string(self): ++ return str(self.window_data) +\ No newline at end of file +diff --git a/src/sentryPlugins/avg_block_io/utils.py b/src/sentryPlugins/avg_block_io/utils.py +index d5f8bb4..d9af7fe 100644 +--- a/src/sentryPlugins/avg_block_io/utils.py ++++ b/src/sentryPlugins/avg_block_io/utils.py +@@ -9,6 +9,7 @@ + # PURPOSE. + # See the Mulan PSL v2 for more details. + import logging ++from .extra_logger import extra_slow_log + + AVG_VALUE = 0 + AVG_COUNT = 1 +@@ -39,6 +40,8 @@ def get_win_data(disk_name, rw, io_data): + """get latency and iodump win data""" + latency = '' + iodump = '' ++ iops = '' ++ iodump_data = '' + for stage_name in io_data[disk_name]: + if 'latency' in io_data[disk_name][stage_name][rw]: + latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() +@@ -46,7 +49,15 @@ def get_win_data(disk_name, rw, io_data): + if 'iodump' in io_data[disk_name][stage_name][rw]: + iodump_list = io_data[disk_name][stage_name][rw]['iodump'].window_data_to_string() + iodump += f'{stage_name}: [{iodump_list}], ' +- return {"latency": latency[:-2], "iodump": iodump[:-2]} ++ if 'iops' in io_data[disk_name][stage_name][rw]: ++ iops_list = io_data[disk_name][stage_name][rw]['iops'].window_data_to_string() ++ iops += f'{stage_name}: [{iops_list}], ' ++ if 'iodump_data' in io_data[disk_name][stage_name][rw]: ++ iodump_data_list = io_data[disk_name][stage_name][rw]['iodump_data'].window_data_to_string() ++ iodump_data += f'"{stage_name}": {iodump_data_list}, ' ++ if iodump_data: ++ iodump_data = '{' + iodump_data[:-2] + '}' ++ return {"latency": latency[:-2], "iodump": iodump[:-2], "iops": iops[:-2], "iodump_data": iodump_data} + + + def is_abnormal(io_key, io_data): +@@ -90,6 +101,8 @@ def update_io_data(period_value, io_data, io_key): + io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_data(period_value[0]) + if all_wins and "iodump" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_data(period_value[1]) ++ if all_wins and "iops" in all_wins: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["iops"].append_new_data(period_value[3]) + + + def log_abnormal_period(old_avg, period_value, io_data, io_key): +@@ -111,6 +124,8 @@ def log_slow_win(msg, reason): + f"iotype: {msg['io_type']}, type: {msg['alarm_type']}, reason: {reason}") + logging.info(f"latency: {msg['details']['latency']}") + logging.info(f"iodump: {msg['details']['iodump']}") ++ logging.info(f"iops: {msg['details']['iops']}") ++ extra_slow_log(msg) + + + def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data): +@@ -137,3 +152,15 @@ def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data) + return True + set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) + return True ++ ++ ++def update_avg_iodump_data(iodump_data, is_success, io_key, io_data): ++ """update iodump data to io_data""" ++ all_wins = get_nested_value(io_data, io_key) ++ if all_wins and "iodump_data" in all_wins: ++ if not is_success: ++ io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data([]) ++ else: ++ period_value = get_nested_value(iodump_data, io_key) ++ io_data[io_key[0]][io_key[1]][io_key[2]]["iodump_data"].append_new_data(period_value) ++ +diff --git a/src/services/sentryCollector/collect_config.py b/src/services/sentryCollector/collect_config.py +index 7ca9898..5793fa3 100644 +--- a/src/services/sentryCollector/collect_config.py ++++ b/src/services/sentryCollector/collect_config.py +@@ -31,6 +31,10 @@ CONF_IO_DISK = 'disk' + CONF_IO_PERIOD_TIME_DEFAULT = 1 + CONF_IO_MAX_SAVE_DEFAULT = 10 + CONF_IO_DISK_DEFAULT = "default" ++CONF_IO_NVME_SSD = "nvme_ssd_threshold" ++CONF_IO_SATA_SSD = "sata_ssd_threshold" ++CONF_IO_SATA_HDD = "sata_hdd_threshold" ++CONF_IO_THRESHOLD_DEFAULT = 1000 + + # log + CONF_LOG = 'log' +@@ -144,5 +148,35 @@ class CollectConfig: + logging.debug("config get_io_config: %s", result_io_config) + return result_io_config + ++ def get_io_threshold(self): ++ result_io_threshold = {} ++ io_map_value = self.load_module_config(CONF_IO) ++ # nvme ssd threshold ++ nvme_ssd_threshold = io_map_value.get(CONF_IO_NVME_SSD) ++ if nvme_ssd_threshold and nvme_ssd_threshold.isdigit() and int(nvme_ssd_threshold) >= 1: ++ result_io_threshold[CONF_IO_NVME_SSD] = int(nvme_ssd_threshold) ++ else: ++ logging.warning("module_name = %s section, field = %s is incorrect, use default %d", ++ CONF_IO, CONF_IO_NVME_SSD, CONF_IO_THRESHOLD_DEFAULT) ++ result_io_threshold[CONF_IO_NVME_SSD] = CONF_IO_THRESHOLD_DEFAULT ++ # sata ssd threshold ++ sata_ssd_threshold = io_map_value.get(CONF_IO_SATA_SSD) ++ if sata_ssd_threshold and sata_ssd_threshold.isdigit() and int(sata_ssd_threshold) >= 1: ++ result_io_threshold[CONF_IO_SATA_SSD] = int(sata_ssd_threshold) ++ else: ++ logging.warning("module_name = %s section, field = %s is incorrect, use default %d", ++ CONF_IO, CONF_IO_SATA_SSD, CONF_IO_THRESHOLD_DEFAULT) ++ result_io_threshold[CONF_IO_SATA_SSD] = CONF_IO_THRESHOLD_DEFAULT ++ # sata hdd threshold ++ sata_hdd_threshold = io_map_value.get(CONF_IO_SATA_HDD) ++ if sata_hdd_threshold and sata_hdd_threshold.isdigit() and int(sata_hdd_threshold) >= 1: ++ result_io_threshold[CONF_IO_SATA_HDD] = int(sata_hdd_threshold) ++ else: ++ logging.warning("module_name = %s section, field = %s is incorrect, use default %d", ++ CONF_IO, CONF_IO_SATA_HDD, CONF_IO_THRESHOLD_DEFAULT) ++ result_io_threshold[CONF_IO_SATA_HDD] = CONF_IO_THRESHOLD_DEFAULT ++ logging.debug("config get_io_threshold: %s", result_io_threshold) ++ return result_io_threshold ++ + def get_common_config(self): + return {key.lower(): value for key, value in self.config['common'].items()} +diff --git a/src/services/sentryCollector/collect_io.py b/src/services/sentryCollector/collect_io.py +index 6db28ec..612ee69 100644 +--- a/src/services/sentryCollector/collect_io.py ++++ b/src/services/sentryCollector/collect_io.py +@@ -17,18 +17,25 @@ import time + import logging + import threading + import subprocess ++import re + from typing import Union + + from .collect_config import CollectConfig ++from .collect_config import CONF_IO_NVME_SSD, CONF_IO_SATA_SSD, CONF_IO_SATA_HDD, CONF_IO_THRESHOLD_DEFAULT ++from .collect_plugin import get_disk_type, DiskType + + Io_Category = ["read", "write", "flush", "discard"] + IO_GLOBAL_DATA = {} + IO_CONFIG_DATA = [] ++IO_DUMP_DATA = {} + EBPF_GLOBAL_DATA = [] + EBPF_PROCESS = None + EBPF_STAGE_LIST = ["wbt", "rq_driver", "bio", "gettag"] + EBPF_SUPPORT_VERSION = ["6.6.0"] + ++#iodump data limit ++IO_DUMP_DATA_LIMIT = 10 ++ + class IoStatus(): + TOTAL = 0 + FINISH = 1 +@@ -50,6 +57,7 @@ class CollectIo(): + self.ebpf_base_path = 'ebpf_collector' + + self.loop_all = False ++ self.io_threshold_config = module_config.get_io_threshold() + + if disk_str == "default": + self.loop_all = True +@@ -57,10 +65,51 @@ class CollectIo(): + self.disk_list = disk_str.strip().split(',') + + self.stop_event = threading.Event() ++ self.iodump_pattern = re.compile( ++ r'(?P[^-]+)-(?P\d+)\s+' ++ r'\w+\s+' ++ r'stage\s+(?P\w+)\s+' ++ r'(?P[0-9a-fA-F]{16})\s+' ++ r'.*started\s+(?P\d+)\s+ns\s+ago' ++ ) + + IO_CONFIG_DATA.append(self.period_time) + IO_CONFIG_DATA.append(self.max_save) + ++ def update_io_threshold(self, disk_name, stage_list): ++ disk_type_result = get_disk_type(disk_name) ++ if disk_type_result["ret"] == 0 and disk_type_result["message"] in ('0', '1', '2'): ++ disk_type = int(disk_type_result["message"]) ++ if disk_type == DiskType.TYPE_NVME_SSD: ++ config_threshold = str(self.io_threshold_config[CONF_IO_NVME_SSD]) ++ elif disk_type == DiskType.TYPE_SATA_SSD: ++ config_threshold = str(self.io_threshold_config[CONF_IO_SATA_SSD]) ++ elif disk_type == DiskType.TYPE_SATA_HDD: ++ config_threshold = str(self.io_threshold_config[CONF_IO_SATA_HDD]) ++ else: ++ return ++ ++ for stage in stage_list: ++ io_threshold_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/threshold'.format(disk_name, stage) ++ try: ++ with open(io_threshold_file, 'r') as file: ++ current_threshold = file.read().strip() ++ except FileNotFoundError: ++ logging.error("The file %s does not exist.", io_threshold_file) ++ continue ++ except Exception as e: ++ logging.error("An error occurred while reading: %s", e) ++ continue ++ ++ if current_threshold != config_threshold: ++ try: ++ with open(io_threshold_file, 'w') as file: ++ file.write(config_threshold) ++ logging.info("update %s io_dump_threshold from %s to %s", ++ io_threshold_file, current_threshold, config_threshold) ++ except Exception as e: ++ logging.error("An error occurred while writing: %s", e) ++ + def get_blk_io_hierarchy(self, disk_name, stage_list): + stats_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/stats'.format(disk_name) + try: +@@ -95,6 +144,8 @@ class CollectIo(): + # read=0, write=1, flush=2, discard=3 + if (len(IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save: + IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].pop() ++ if (len(IO_DUMP_DATA[disk_name][stage][Io_Category[index]])) >= self.max_save: ++ IO_DUMP_DATA[disk_name][stage][Io_Category[index]].pop() + + curr_lat = self.get_latency_value(curr_stage_value, last_stage_value, index) + curr_iops = self.get_iops(curr_stage_value, last_stage_value, index) +@@ -102,6 +153,8 @@ class CollectIo(): + curr_io_dump = self.get_io_dump(disk_name, stage, index) + + IO_GLOBAL_DATA[disk_name][stage][Io_Category[index]].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) ++ if curr_io_dump == 0: ++ IO_DUMP_DATA[disk_name][stage][Io_Category[index]].insert(0, []) + + def get_iops(self, curr_stage_value, last_stage_value, category): + try: +@@ -151,11 +204,31 @@ class CollectIo(): + def get_io_dump(self, disk_name, stage, category): + io_dump_file = '/sys/kernel/debug/block/{}/blk_io_hierarchy/{}/io_dump'.format(disk_name, stage) + count = 0 ++ io_dump_msg = [] ++ pattern = self.iodump_pattern ++ + try: + with open(io_dump_file, 'r') as file: + for line in file: +- count += line.count('.op=' + Io_Category[category].upper()) ++ if line.count('.op=' + Io_Category[category].upper()) > 0: ++ match = pattern.match(line) ++ if match: ++ if count < IO_DUMP_DATA_LIMIT: ++ parsed = match.groupdict() ++ values = [ ++ parsed["task_name"], ++ parsed["pid"], ++ parsed["stage"], ++ parsed["ptr"], ++ str(int(parsed["start_time_ns"]) // 1000000) ++ ] ++ value_str = ",".join(values) ++ io_dump_msg.append(value_str) ++ else: ++ logging.info(f"io_dump parse err, info : {line.strip()}") ++ count += 1 + if count > 0: ++ IO_DUMP_DATA[disk_name][stage][Io_Category[category]].insert(0, io_dump_msg) + logging.info(f"io_dump info : {disk_name}, {stage}, {Io_Category[category]}, {count}") + except FileNotFoundError: + logging.error("The file %s does not exist.", io_dump_file) +@@ -211,6 +284,7 @@ class CollectIo(): + self.disk_map_stage[disk_name] = stage_list + self.window_value[disk_name] = {} + IO_GLOBAL_DATA[disk_name] = {} ++ IO_DUMP_DATA[disk_name] = {} + + return len(IO_GLOBAL_DATA) != 0 + +@@ -226,13 +300,16 @@ class CollectIo(): + self.disk_map_stage[disk_name] = EBPF_STAGE_LIST + self.window_value[disk_name] = {} + IO_GLOBAL_DATA[disk_name] = {} ++ IO_DUMP_DATA[disk_name] = {} + + for disk_name, stage_list in self.disk_map_stage.items(): + for stage in stage_list: + self.window_value[disk_name][stage] = {} + IO_GLOBAL_DATA[disk_name][stage] = {} ++ IO_DUMP_DATA[disk_name][stage] = {} + for category in Io_Category: + IO_GLOBAL_DATA[disk_name][stage][category] = [] ++ IO_DUMP_DATA[disk_name][stage][category] = [] + self.window_value[disk_name][stage][category] = [[0,0,0], [0,0,0]] + + return major_version in EBPF_SUPPORT_VERSION and os.path.exists('/usr/bin/ebpf_collector') and len(IO_GLOBAL_DATA) != 0 +@@ -311,6 +388,8 @@ class CollectIo(): + return + if (len(IO_GLOBAL_DATA[disk_name][stage][io_type])) >= self.max_save: + IO_GLOBAL_DATA[disk_name][stage][io_type].pop() ++ if (len(IO_DUMP_DATA[disk_name][stage][io_type])) >= self.max_save: ++ IO_DUMP_DATA[disk_name][stage][io_type].pop() + curr_finish_count, curr_latency, curr_io_dump_count = self.window_value[disk_name][stage][io_type][-1] + prev_finish_count, prev_latency, prev_io_dump_count = self.window_value[disk_name][stage][io_type][-2] + self.window_value[disk_name][stage][io_type].pop(0) +@@ -322,6 +401,7 @@ class CollectIo(): + if curr_io_dump > 0: + logging.info(f"ebpf io_dump info : {disk_name}, {stage}, {io_type}, {curr_io_dump}") + IO_GLOBAL_DATA[disk_name][stage][io_type].insert(0, [curr_lat, curr_io_dump, curr_io_length, curr_iops]) ++ IO_DUMP_DATA[disk_name][stage][io_type].insert(0, []) + + elapsed_time = time.time() - start_time + sleep_time = self.period_time - elapsed_time +@@ -419,6 +499,7 @@ class CollectIo(): + + def main_loop(self): + global IO_GLOBAL_DATA ++ global IO_DUMP_DATA + logging.info("collect io thread start") + + if self.is_kernel_avaliable() and len(self.disk_map_stage) != 0: +@@ -426,8 +507,11 @@ class CollectIo(): + for stage in stage_list: + self.window_value[disk_name][stage] = [] + IO_GLOBAL_DATA[disk_name][stage] = {} ++ IO_DUMP_DATA[disk_name][stage] = {} + for category in Io_Category: + IO_GLOBAL_DATA[disk_name][stage][category] = [] ++ IO_DUMP_DATA[disk_name][stage][category] = [] ++ self.update_io_threshold(disk_name, stage_list) + + while True: + start_time = time.time() +diff --git a/src/services/sentryCollector/collect_server.py b/src/services/sentryCollector/collect_server.py +index ad3ac0e..b045d4c 100644 +--- a/src/services/sentryCollector/collect_server.py ++++ b/src/services/sentryCollector/collect_server.py +@@ -24,7 +24,7 @@ import select + import threading + import time + +-from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA ++from .collect_io import IO_GLOBAL_DATA, IO_CONFIG_DATA, IO_DUMP_DATA + from .collect_config import CollectConfig + + SENTRY_RUN_DIR = "/var/run/sysSentry" +@@ -48,6 +48,7 @@ RES_MAGIC = "RES" + class ServerProtocol(): + IS_IOCOLLECT_VALID = 0 + GET_IO_DATA = 1 ++ GET_IODUMP_DATA = 2 + PRO_END = 3 + + class CollectServer(): +@@ -58,78 +59,86 @@ class CollectServer(): + + self.stop_event = threading.Event() + +- def is_iocollect_valid(self, data_struct): +- ++ @staticmethod ++ def get_io_common(data_struct, data_source): + result_rev = {} +- self.io_global_data = IO_GLOBAL_DATA + + if len(IO_CONFIG_DATA) == 0: + logging.error("the collect thread is not started, the data is invalid.") + return json.dumps(result_rev) +- + period_time = IO_CONFIG_DATA[0] + max_save = IO_CONFIG_DATA[1] + +- disk_list = json.loads(data_struct['disk_list']) + period = int(data_struct['period']) ++ disk_list = json.loads(data_struct['disk_list']) + stage_list = json.loads(data_struct['stage']) ++ iotype_list = json.loads(data_struct['iotype']) + + if (period < period_time) or (period > period_time * max_save) or (period % period_time): +- logging.error("is_iocollect_valid: period time is invalid, user period: %d, config period_time: %d", period, period_time) ++ logging.error("get_io_common: period time is invalid, user period: %d, config period_time: %d", ++ period, period_time) + return json.dumps(result_rev) + +- for disk_name, stage_info in self.io_global_data.items(): +- if len(disk_list) > 0 and disk_name not in disk_list: +- continue +- result_rev[disk_name] = [] +- if len(stage_list) == 0: +- result_rev[disk_name] = list(stage_info.keys()) ++ collect_index = period // period_time - 1 ++ logging.debug("user period: %d, config period_time: %d, collect_index: %d", period, period_time, collect_index) ++ ++ for disk_name, stage_info in data_source.items(): ++ if disk_name not in disk_list: + continue +- for stage_name, stage_data in stage_info.items(): +- if stage_name in stage_list: +- result_rev[disk_name].append(stage_name) ++ result_rev[disk_name] = {} ++ for stage_name, iotype_info in stage_info.items(): ++ if len(stage_list) > 0 and stage_name not in stage_list: ++ continue ++ result_rev[disk_name][stage_name] = {} ++ for iotype_name, iotype_data in iotype_info.items(): ++ if iotype_name not in iotype_list: ++ continue ++ if len(iotype_data) - 1 < collect_index: ++ continue ++ result_rev[disk_name][stage_name][iotype_name] = iotype_data[collect_index] + + return json.dumps(result_rev) + +- def get_io_data(self, data_struct): ++ def is_iocollect_valid(self, data_struct): ++ + result_rev = {} + self.io_global_data = IO_GLOBAL_DATA + + if len(IO_CONFIG_DATA) == 0: + logging.error("the collect thread is not started, the data is invalid.") + return json.dumps(result_rev) ++ + period_time = IO_CONFIG_DATA[0] + max_save = IO_CONFIG_DATA[1] + +- period = int(data_struct['period']) + disk_list = json.loads(data_struct['disk_list']) ++ period = int(data_struct['period']) + stage_list = json.loads(data_struct['stage']) +- iotype_list = json.loads(data_struct['iotype']) + + if (period < period_time) or (period > period_time * max_save) or (period % period_time): +- logging.error("get_io_data: period time is invalid, user period: %d, config period_time: %d", period, period_time) ++ logging.error("is_iocollect_valid: period time is invalid, user period: %d, config period_time: %d", period, period_time) + return json.dumps(result_rev) + +- collect_index = period // period_time - 1 +- logging.debug("user period: %d, config period_time: %d, collect_index: %d", period, period_time, collect_index) +- + for disk_name, stage_info in self.io_global_data.items(): +- if disk_name not in disk_list: ++ if len(disk_list) > 0 and disk_name not in disk_list: + continue +- result_rev[disk_name] = {} +- for stage_name, iotype_info in stage_info.items(): +- if len(stage_list) > 0 and stage_name not in stage_list: +- continue +- result_rev[disk_name][stage_name] = {} +- for iotype_name, iotype_info in iotype_info.items(): +- if iotype_name not in iotype_list: +- continue +- if len(iotype_info) - 1 < collect_index: +- continue +- result_rev[disk_name][stage_name][iotype_name] = iotype_info[collect_index] ++ result_rev[disk_name] = [] ++ if len(stage_list) == 0: ++ result_rev[disk_name] = list(stage_info.keys()) ++ continue ++ for stage_name, stage_data in stage_info.items(): ++ if stage_name in stage_list: ++ result_rev[disk_name].append(stage_name) + + return json.dumps(result_rev) + ++ def get_io_data(self, data_struct): ++ self.io_global_data = IO_GLOBAL_DATA ++ return self.get_io_common(data_struct, self.io_global_data) ++ ++ def get_iodump_data(self, data_struct): ++ return self.get_io_common(data_struct, IO_DUMP_DATA) ++ + def msg_data_process(self, msg_data, protocal_id): + """message data process""" + logging.debug("msg_data %s", msg_data) +@@ -144,6 +153,8 @@ class CollectServer(): + res_msg = self.is_iocollect_valid(data_struct) + elif protocal_id == ServerProtocol.GET_IO_DATA: + res_msg = self.get_io_data(data_struct) ++ elif protocal_id == ServerProtocol.GET_IODUMP_DATA: ++ res_msg = self.get_iodump_data(data_struct) + + return res_msg + +-- +2.48.1 + diff --git a/sysSentry.spec b/sysSentry.spec index dab283c..59e2431 100644 --- a/sysSentry.spec +++ b/sysSentry.spec @@ -4,7 +4,7 @@ Summary: System Inspection Framework Name: sysSentry Version: 1.0.3 -Release: 17 +Release: 18 License: Mulan PSL v2 Group: System Environment/Daemons Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz @@ -37,6 +37,7 @@ Patch25: Use-panic-instead-of-coredump-file.patch Patch26: Fix-Security-Scan-Warning.patch Patch27: Fix-two-code-review-comments.patch Patch28: Add-MulanV2-License-statement.patch +Patch29: slow-io-plugin-upgrade.patch BuildRequires: cmake gcc-c++ BuildRequires: python3 python3-setuptools @@ -289,6 +290,12 @@ rm -rf /var/run/sysSentry | : %attr(0600,root,root) %config(noreplace) %{_sysconfdir}/sysSentry/tasks/soc_ring_sentry.mod %changelog +* Mon Nov 3 2025 hewanhan - 1.0.3-18 +- Type:feature +- CVE:NA +- SUG:NA +- DESC:slow io plugin upgrade + * Tue Oct 21 2025 Qizhi Zhang - 1.0.3-17 - Type:feature - CVE:NA -- Gitee