From 90b5ec44f1d007f230f2882d909b8c3dd121873a Mon Sep 17 00:00:00 2001 From: huangbin Date: Mon, 21 Jul 2025 20:38:20 +0800 Subject: [PATCH] optimize failslow with data queue. update docs. --- docs/0.quickstart.md | 55 +++++++++++++- failslow/config/model_config.json | 8 +- .../failslow/dataloader/step_time_reader.py | 41 +++++++--- failslow/failslow/fail_slow_detection.py | 75 ++++++++++++------- 4 files changed, 136 insertions(+), 43 deletions(-) diff --git a/docs/0.quickstart.md b/docs/0.quickstart.md index 187a870..6561ccc 100644 --- a/docs/0.quickstart.md +++ b/docs/0.quickstart.md @@ -118,21 +118,38 @@ python /sysTrace/convert/convert_osprobe_to_timeline.py --outp ### 数据分析 -failslow算法包安装: +#### failslow参数配置 + +| 参数 | 类型 | 参数说明 | 举例 | +| --------------------------- | ------ | ---------------------------------------------------- | ------------------------------------------------------------ | +| training_log | string | 算法输入torch数据路径,以“*.timeline”结尾, 取0卡即可 | /home/workspace/hbdir/systrace/localhost.localdomain--00000.timeline | +| fail_slow_perception_path | string | 劣化感知算法输出结果,保存为json文件 | /etc/systrace/result/fail_slow/fail_slow_perception_result_failSlow_1753099791.json | +| max_data_queue_steps | int | 缓存step数据最大队列 | 500 | +| min_startup_detection_steps | int | 启动检测的最小step数量 | 10 | +| task_stable_step | int | 任务初始训练时step稳定的数量 | 3 | +| fail_slow_span_mins | float | 劣化感知算法的检测周期,单位min | 30 | +| hang_times_mins_thr | float | 判断任务是否hang的阈值,单位min | 30 | +| steps_window_size | int | 滑动窗口大小, | 5 | +| k_sigma | int | bboxplot算法的ksigma取值 | 3 | +| anomaly_degree_thr | float | 检测值偏离均值的程度 | 0.2 | + +#### failslow算法 + +算法安装 ```shell ##安装 python setup.py install ``` -性能劣化感知算法执行: +感知算法执行 ```shell ## L0感知 python failslow/fail_slow_detection.py ``` -慢卡定界算法执行: +慢卡定界算法执行 ```shell ## L1定界 @@ -143,5 +160,35 @@ python failslow/main.py **注意:算法执行前,需[参考文档](https://gitee.com/openeuler/sysTrace/blob/master/failslow/docs/conf_introduction.md)配置对应的数据路径** -## +#### failslow算法输出字段 + +| 字段 | 类型 | 说明 | 举例 | +| ------------------- | ------ | ------------------------------------- | ------------------------------------------------------------ | +| is_anomaly | bool | 检测数据是否异常 | True | +| anomaly_count_times | int | 检测出的异常点数 | 1 | +| anomaly_info | list | 记录异常信息,每个元素对应一个异常点 | {
'training_step': 16,
'anomaly_time': '2025-06-12 19:39:24',
'anomaly_degree': 26.053,
'anomaly_training_time': '69435ms',
'normal_training_time': '2566.6ms'
}] | +| anomaly_type | string | 检测结果类型: normal, failslow, hang | failslow | +| start_time | int | 检测开始时间 | 1749728380752 | +| end_time | int | 检测结束时间 | 1749728419305 | + +样例 + +```json +{ + 'is_anomaly': True, + 'anomaly_count_times': 1, + 'anomaly_info': [{ + 'training_step': 16, + 'anomaly_time': '2025-06-12 19:39:24', + 'anomaly_degree': 26.053, + 'anomaly_training_time': '69435ms', + 'normal_training_time': '2566.6ms' + }], + 'anomaly_type': 'failSlow', + 'start_time': 1749728380752, + 'end_time': 1749728419305 +} +``` + + diff --git a/failslow/config/model_config.json b/failslow/config/model_config.json index 62464cd..a87d069 100644 --- a/failslow/config/model_config.json +++ b/failslow/config/model_config.json @@ -1,11 +1,13 @@ { "with_fail_slow": false, "task_stable_step": 3, + "max_data_queue_steps": 1000, "min_startup_detection_steps": 10, - "fail_slow_span_mins": 10, - "training_log": "/home/sysTrace_dataloader/timeline/localhost.localdomain--00000.timeline", + "fail_slow_span_mins": 30, + "training_log": "/home/workspace/hbdir/systrace/localhost.localdomain--00000.timeline", "fail_slow_perception_path": "/etc/systrace/result/fail_slow", - "steps_window_size": 5, + "hang_times_mins_thr": 30, + "steps_window_size": 10, "k_sigma": 2, "anomaly_degree_thr": 0.2, "slow_node_detection_range_times": [], diff --git a/failslow/failslow/dataloader/step_time_reader.py b/failslow/failslow/dataloader/step_time_reader.py index a047f05..b500c3e 100644 --- a/failslow/failslow/dataloader/step_time_reader.py +++ b/failslow/failslow/dataloader/step_time_reader.py @@ -18,6 +18,9 @@ logger = get_default_logger(__name__) class StepReader: def __init__(self): self.save_path = None + self.former_first_event = None + self.is_update = True + self.debug = False def init_save_path(self, file_path): self.save_path = os.path.dirname(file_path) @@ -32,7 +35,25 @@ class StepReader: before_start = None timestamps = [] steps_time = [] + if self.debug: + index = 1 + else: + index = 0 + for stage in pytorch_data.pytorch_stages: + if index == 0: + if self.former_first_event: + # 表明torch数据未更新,则不用解析数据检测 + if self.former_first_event.start_us == stage.start_us: + self.is_update = False + logger.info(f"data not update.") + break + else: + self.former_first_event = stage + else: + self.former_first_event = stage + + index += 1 if "dataloader" in stage.stage_type: start_ms = int(stage.start_us / 1000) end_ms = int(stage.end_us / 1000) @@ -43,17 +64,19 @@ class StepReader: steps_time.append(start_ms - before_start) timestamps.append(start_ms) before_start = start_ms - data = { - 'time': timestamps, - 'step_time': steps_time - } + 'time': timestamps, + 'step_time': steps_time + } df = pd.DataFrame(data) - logger.info(f"step time data: {steps_time}.") - save_file_path = os.path.join(self.save_path, "step_time.csv") - df.to_csv(save_file_path, index=False) - logger.info(f"Save file in {save_file_path}") - + if self.is_update: + logger.info(f"step time data: {steps_time}.") + save_file_path = os.path.join(self.save_path, "step_time.csv") + df.to_csv(save_file_path, index=False) + logger.info(f"Save file in {save_file_path}") + else: + logger.info(f"data not update, not save.") + return df def get_step_data_from_training_log(self, log_file_path: str) -> pd.DataFrame: diff --git a/failslow/failslow/fail_slow_detection.py b/failslow/failslow/fail_slow_detection.py index c8bf7fd..67d3a4a 100644 --- a/failslow/failslow/fail_slow_detection.py +++ b/failslow/failslow/fail_slow_detection.py @@ -1,6 +1,8 @@ ''' env FAIL_SLOW_STOP: control fail slow stop +2025-07-21: add data match for L0 data covering + ''' import re import json @@ -17,6 +19,9 @@ from failslow.dataloader.step_time_reader import StepReader logger = get_default_logger(__name__) +DATA_QUEUE = pd.DataFrame({'time': [], 'step_time': []}) +DROP_DATA_LENGTH = 0 + def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): """ 检测 step_time 序列中的异常值,并记录异常信息 @@ -26,13 +31,12 @@ def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): :param threshold: 异常判断的阈值,即当前值与移动平均的差值超过多少倍标准差认为是异常 :return: 异常信息列表,每个元素为 (异常时刻索引, 异常程度) """ - window_size = model_args.get("steps_window_size", 5) + window_size = model_args.get("steps_window_size", 10) k_sigma_threshold = model_args.get("k_sigma", 2) anomaly_degree_thr = model_args.get("anomaly_degree_thr", 0.2) anomalies = [] step_times = data_df["step_time"] timestamps = data_df["time"] - # print(f"training step time: {step_times}") for i in range(len(step_times)): if i < window_size: continue @@ -58,7 +62,7 @@ def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): next_anomaly_degree = next_diff / anomaly_degree_thr if next_anomaly_degree > anomaly_degree_thr: anomalies.append( - {"training_step": i, "anomaly_time": timestamps[i].strftime('%Y-%m-%d %H:%M:%S'), + {"training_step": i, "anomaly_time": datetime.fromtimestamp(timestamps[i]/1000).strftime('%Y-%m-%d %H:%M:%S'), "anomaly_degree": round(anomaly_degree, 3), "anomaly_training_time": f"{current_step_time}ms", "normal_training_time": f"{moving_average}ms"}) @@ -103,6 +107,18 @@ def get_extract_func_str(log_type: str): return extrct_func_dict.get(log_type, None) +def update_queue_data(data: pd.DataFrame, max_data_queue_steps: int): + global DATA_QUEUE + global DROP_DATA_LENGTH + if len(DATA_QUEUE) > 0: + history_data_length = len(DATA_QUEUE) + DATA_QUEUE = pd.concat([DATA_QUEUE, data], axis=0, ignore_index=True) + if len(DATA_QUEUE) > max_data_queue_steps: + start_data_length = min(len(data), history_data_length) + DROP_DATA_LENGTH += start_data_length + DATA_QUEUE = DATA_QUEUE[start_data_length:].reset_index(drop=True) + else: + DATA_QUEUE = data def run_slow_node_perception(args: Dict): training_log = args.get("training_log", "./log/rank0_mindformer.log") @@ -111,13 +127,15 @@ def run_slow_node_perception(args: Dict): log_type = args.get("log_type", "timeline") task_stable_step = args.get("task_stable_step", 2) # just for first time detection - fail_slow_span_mins = args.get("fail_slow_span_mins", 0.1) + fail_slow_span_mins = args.get("fail_slow_span_mins", 0.1) # for detection interval + max_data_queue_steps = args.get("max_data_queue_steps", 100) min_statup_detection_steps = args.get("min_startup_detection_steps", 10) - hang_times_thr = args.get("hang_times_thr", 5) + hang_times_mins_thr = args.get("hang_times_mins_thr", 0.5) detecting_range_steps = [0, 0] first_flag = False hang_info = [] + hang_time_stamp = [] next_detection_timestamp = None timer_flag = False @@ -125,59 +143,62 @@ def run_slow_node_perception(args: Dict): log_extract_func = getattr(step_reader, get_extract_func_str(log_type)) while True: - # now_time = datetime.now(timezone.utc).astimezone().astimezone() - # now_timestamp = now_time.timestamp() - # if next_detection_timestamp and now_timestamp > next_detection_timestamp: - # print("waiting run detection.....") - # else: - # continue - # next_detection_timestamp = (now_time + timedelta(minutes=fail_slow_span_mins)).timestamp() if timer_flag: time.sleep(fail_slow_span_mins * 60) timer_flag = True - - data = log_extract_func(training_log) - training_steps = len(data) + update_queue_data(data, max_data_queue_steps) + + training_steps = len(DATA_QUEUE) if not training_steps: logger.info(f"training data is empty.") continue # if data not training, record not training times # remove model init process - if training_steps == (detecting_range_steps[1]) and detecting_range_steps[1]: + # data not update + if training_steps == (detecting_range_steps[1]) and detecting_range_steps[1] and (not step_reader.is_update): logger.info("start hang detection") - - now_time = datetime.now(timezone.utc).astimezone().astimezone() + now_time = datetime.now(timezone.utc).astimezone() + now_time_stamp = now_time.timestamp() format_str = '%Y-%m-%d %H:%M:%S %z' now_time_str = now_time.strftime(format_str) + hang_time_stamp.append(now_time_stamp) hang_info.append(now_time_str) - if len(hang_info) > hang_times_thr: + hang_times = round((now_time_stamp - hang_time_stamp[0]) / 60, 2) + logger.info(f"hang time min: {hang_times}") + if hang_time_stamp and hang_times > hang_times_mins_thr: # record hang anomaly_info = { + "is_anomaly": True, + "anomaly_count_times": 1, + "anomaly_info": [{ + "detect_point": hang_info, + "hang_minutes": hang_times + }], "anomaly_type": AnomalyType.hang, - "detect_point": hang_info, - "hang_minutes": fail_slow_span_mins * hang_times_thr + "start_time": int(hang_time_stamp[0] * 1000), + "end_time": int(hang_time_stamp[1] * 1000) } logger.info(f"hang detection find training process is hang at: {hang_info[0]}") write_anomaly_info(anomaly_info, fail_slow_perception_result) continue else: hang_info = [] + hang_time_stamp = [] new_detecting_range_steps = [0, 0] + new_detecting_range_steps[1] = training_steps + DROP_DATA_LENGTH if not detecting_range_steps[1]: first_flag = True - new_detecting_range_steps[1] = training_steps else: if first_flag: # second time detect, start not change new_detecting_range_steps[0] = detecting_range_steps[0] else: + # main update start new_detecting_range_steps[0] = (detecting_range_steps[0] + detecting_range_steps[1]) // 2 first_flag = False - new_detecting_range_steps[1] = training_steps - range_steps = new_detecting_range_steps[1] - new_detecting_range_steps[0] if range_steps < min_statup_detection_steps: logger.warning( @@ -187,9 +208,9 @@ def run_slow_node_perception(args: Dict): detecting_range_steps = new_detecting_range_steps if first_flag: detecting_range_steps[0] = task_stable_step - logger.info(f"Detection data: {detecting_range_steps}.") - data = data.loc[detecting_range_steps[0]: detecting_range_steps[1]].reset_index(drop=True) - anomaly_info = detect_step_time_anomalies(data, args) + logger.info(f"Detection data range: {detecting_range_steps}, data queue: {len(DATA_QUEUE)}, drop data length: {DROP_DATA_LENGTH}.") + detected_data = DATA_QUEUE.loc[(detecting_range_steps[0]-DROP_DATA_LENGTH): (detecting_range_steps[1] - DROP_DATA_LENGTH)].reset_index(drop=True) + anomaly_info = detect_step_time_anomalies(detected_data, args) write_anomaly_info(anomaly_info, fail_slow_perception_result) -- Gitee