From 598a2e3590fc0abf8a3599cae737a2452bf1a5f3 Mon Sep 17 00:00:00 2001 From: houxu Date: Fri, 15 Aug 2025 11:16:09 +0800 Subject: [PATCH] update fail_slow_detection --- .../service/systrace-mcpserver.service | 2 +- .../systrace_mcp/fail_slow_detection_api.py | 41 ++++++++------ systrace_mcp/systrace_mcp/mcp_data.py | 10 ++-- systrace_mcp/systrace_mcp/mcp_server.py | 34 ++++++------ .../systrace_mcp/remote_file_fetcher.py | 54 +++++++++++-------- 5 files changed, 82 insertions(+), 59 deletions(-) diff --git a/systrace_mcp/service/systrace-mcpserver.service b/systrace_mcp/service/systrace-mcpserver.service index 8964806..e488cd2 100644 --- a/systrace_mcp/service/systrace-mcpserver.service +++ b/systrace_mcp/service/systrace-mcpserver.service @@ -4,7 +4,7 @@ After=network.target [Service] Type=exec -ExecStart=/usr/bin/systrac-mcpserver +ExecStart=/usr/bin/systrace-mcpserver Restart=on-failure RestartSec=1 diff --git a/systrace_mcp/systrace_mcp/fail_slow_detection_api.py b/systrace_mcp/systrace_mcp/fail_slow_detection_api.py index 2556b7d..6b20e3f 100644 --- a/systrace_mcp/systrace_mcp/fail_slow_detection_api.py +++ b/systrace_mcp/systrace_mcp/fail_slow_detection_api.py @@ -21,7 +21,12 @@ logger = get_default_logger(__name__) DATA_QUEUE = pd.DataFrame({'time': [], 'step_time': []}) DROP_DATA_LENGTH = 0 +detecting_range_steps = [0, 0] +first_flag = False +hang_info = [] +hang_time_stamp = [] +step_reader = StepReader() def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): """ @@ -148,20 +153,12 @@ def run_slow_node_perception(args: Dict, task_id: str): min_statup_detection_steps = args.get("min_startup_detection_steps", 10) hang_times_mins_thr = args.get("hang_times_mins_thr", 0.5) - detecting_range_steps = [0, 0] - first_flag = False - hang_info = [] - hang_time_stamp = [] - next_detection_timestamp = None - timer_flag = False - - step_reader = StepReader() + global detecting_range_steps + global first_flag + global hang_info + global hang_time_stamp log_extract_func = getattr(step_reader, get_extract_func_str(log_type)) - if timer_flag: - time.sleep(fail_slow_span_mins * 60) - timer_flag = True - data = log_extract_func(training_log) update_queue_data(data, max_data_queue_steps) training_steps = len(DATA_QUEUE) @@ -181,6 +178,19 @@ def run_slow_node_perception(args: Dict, task_id: str): hang_info.append(now_time_str) hang_times = round((now_time_stamp - hang_time_stamp[0]) / 60, 2) logger.info(f"hang time min: {hang_times}") + + anomaly_info = {} + anomaly_info["is_anomaly"] = True + anomaly_info["anomaly_count_times"] = 0 + anomaly_info["anomaly_info"] = [] + anomaly_info["anomaly_type"] = AnomalyType.hang + anomaly_info["start_time"] = int(hang_time_stamp[0] * 1000) + anomaly_info["end_time"] = int(hang_time_stamp[-1] * 1000) + anomaly_info["anomaly_info"] = [{ + "detect_point": hang_info, + "hang_minutes": hang_times + }] + anomaly_info["task_id"] = task_id if hang_time_stamp and hang_times > hang_times_mins_thr: # record hang anomaly_info = { @@ -192,11 +202,11 @@ def run_slow_node_perception(args: Dict, task_id: str): }], "anomaly_type": AnomalyType.hang, "start_time": int(hang_time_stamp[0] * 1000), - "end_time": int(hang_time_stamp[1] * 1000) + "end_time": int(hang_time_stamp[-1] * 1000) } logger.info(f"hang detection find training process is hang at: {hang_info[0]}") write_anomaly_info(anomaly_info, fail_slow_perception_result) - return f"hang detection find training process is hang at: {hang_info[0]}" + return anomaly_info else: hang_info = [] hang_time_stamp = [] @@ -217,7 +227,7 @@ def run_slow_node_perception(args: Dict, task_id: str): if range_steps < min_statup_detection_steps: logger.warning( f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}.") - return f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}." + raise f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}." detecting_range_steps = new_detecting_range_steps if first_flag: @@ -229,6 +239,5 @@ def run_slow_node_perception(args: Dict, task_id: str): anomaly_info = detect_step_time_anomalies(detected_data, args) write_anomaly_info(anomaly_info, fail_slow_perception_result) - fail_slow_stop_flag = os.getenv('FAIL_SLOW_STOP', 'False').lower() == "true" anomaly_info["task_id"] = task_id return anomaly_info diff --git a/systrace_mcp/systrace_mcp/mcp_data.py b/systrace_mcp/systrace_mcp/mcp_data.py index 04837c4..68632f2 100644 --- a/systrace_mcp/systrace_mcp/mcp_data.py +++ b/systrace_mcp/systrace_mcp/mcp_data.py @@ -1,8 +1,7 @@ -from typing import List +from typing import List, Union from pydantic import BaseModel, Field from enum import Enum - class ReportType(str, Enum): normal = "normal" anomaly = "anomaly" @@ -16,13 +15,16 @@ class AnomalyInfo(BaseModel): anomaly_training_time: str = Field(default="", description="劣化训练step时间(默认空字符串)") normal_training_time: str = Field(default="", description="正常训练step时间(默认空字符串)") - +class AnomalyInfo2(BaseModel): + """劣化详细信息结构""" + detect_point: str = Field(default="", description="检测点") + hang_minutes: str = Field(default="", description="hang的分钟数") class PerceptionResult(BaseModel): """慢节点感知结果结构""" is_anomaly: bool = Field(default=False, description="是否发生性能劣化(默认false)") anomaly_count_times: int = Field(default=0, description="劣化次数(默认0)") # 列表类型使用 default_factory 避免 mutable 默认值问题 - anomaly_info: List[AnomalyInfo] = Field( + anomaly_info: Union[List[AnomalyInfo],List[AnomalyInfo2]] = Field( default_factory=list, description="劣化详细信息(默认空列表)" ) diff --git a/systrace_mcp/systrace_mcp/mcp_server.py b/systrace_mcp/systrace_mcp/mcp_server.py index ada6926..36cf9a9 100644 --- a/systrace_mcp/systrace_mcp/mcp_server.py +++ b/systrace_mcp/systrace_mcp/mcp_server.py @@ -9,7 +9,7 @@ from failslow.util.constant import MODEL_CONFIG_PATH from failslow.main import main as slow_node_detection_api from systrace_mcp.report_api import generate_normal_report, generate_degraded_report -from systrace_mcp.mcp_data import PerceptionResult,ReportType,AIJobDetectResult +from systrace_mcp.mcp_data import PerceptionResult, ReportType, AIJobDetectResult from systrace_mcp.fail_slow_detection_api import run_slow_node_perception from systrace_mcp.remote_file_fetcher import sync_server_by_ip_and_type @@ -17,18 +17,22 @@ logger = get_default_logger(__name__) # 仅在 Linux 环境下强制使用 spawn 方式 import multiprocessing import os - +import logging +import sys + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) +logger = logging.getLogger("systrace_mcpserver") if os.name == "posix": # posix 表示 Linux/macOS multiprocessing.set_start_method("spawn", force=True) # 创建MCP Server mcp = FastMCP("SysTrace MCP Server", host="0.0.0.0", port=12145) -@mcp.prompt(description="工具定位") -def self_introduction() -> str: - return "面向运维、开发人员,支持自然语言对接,实现启发式调优,实现3个工具接口,分别为性能劣化感知工具,慢卡定界工具,报告输出工具。" - - @mcp.prompt(description="调用逻辑:1. 当用户询问特定任务ID的机器性能是否劣化时调用。2. 检测结果将决定后续流程走向。\ 3. 调用完成后如果出现劣化现象,则把当前工具得到的结果作为入参,调用slow_node_detection_tool方法 ,如果没有出现劣化现象,则调用报告工具返回报告给用户。\ 4. 本方法得到的结果必须再调用generate_report 生成报告给到用户" @@ -42,8 +46,8 @@ def slow_node_perception_tool(task_id: str) -> PerceptionResult: 入参 task_id ,如 192.168.2.122; 返回 PerceptionResult 如果is_anomaly=false,该结果需要调用generate_report_tool再返回给用户;如果is_anomaly=True,该结果必须调用slow_node_detection_tool得到报告 """ - print("性能劣化感知工具 开启") - print("task_id = " + task_id) + logger.info("调用 性能劣化感知工具") + logger.info("task_id = " + task_id) with open(MODEL_CONFIG_PATH, 'r', encoding='utf-8') as reader: model_args = json.load(reader) @@ -63,13 +67,13 @@ def slow_node_detection_tool(performance_data: PerceptionResult) -> AIJobDetectR performance_data: 感知工具返回的完整性能数据PerceptionResult; 输出:AIJobDetectResult,该结果必须要调用generate_report_tool得到报告再返回给用户 """ - print("慢卡定界工具") - print("performance_data = " + str(performance_data)) - print("task_id = " + performance_data.task_id) + logger.info("调用 慢卡定界工具") + logger.info("performance_data = " + str(performance_data)) + logger.info("task_id = " + performance_data.task_id) sync_server_by_ip_and_type(performance_data.task_id, "detection") _res = slow_node_detection_api() _res = AIJobDetectResult.model_validate(_res) - print("result = " + str(_res)) + logger.info("result = " + str(_res)) return _res @@ -91,7 +95,7 @@ def generate_report_tool(source_data: Union[PerceptionResult, AIJobDetectResult] 2、细节:每条节点的具体卡号{objectId}、异常指标{kpiId}(其中:HcclAllGather表示集合通信库的AllGather时序序列指标;HcclReduceScatter表示集合通信库的ReduceScatter时序序列指标;HcclAllReduce表示集合通信库的AllReduce时序序列指标;),检测方法{methodType}(SPACE 多节点空间对比检测器,TIME 单节点时间检测器),以表格形式呈现; 3、针对这个节点给出检测建议,如果是计算类型,建议检测卡的状态,算子下发以及算子执行的代码,对慢节点进行隔离;如果是网络问题,建议检测组网的状态,使用压测节点之间的连通状态;如果是存储问题,建议检测存储的磁盘以及用户脚本中的dataloader和保存模型代码。 """ - print("调用了报告工具,report_type = " + report_type.value) + logger.info("调用 报告工具,report_type = " + report_type.value) # 根据报告类型调用对应的生成方法 if report_type == ReportType.normal: result = generate_normal_report(source_data) @@ -99,7 +103,7 @@ def generate_report_tool(source_data: Union[PerceptionResult, AIJobDetectResult] result = generate_degraded_report(source_data) else: raise Exception("不支持的报告类型") - print("报告:", result) + logger.info(f"报告:{result}") return result diff --git a/systrace_mcp/systrace_mcp/remote_file_fetcher.py b/systrace_mcp/systrace_mcp/remote_file_fetcher.py index e20e669..9fb0ec3 100644 --- a/systrace_mcp/systrace_mcp/remote_file_fetcher.py +++ b/systrace_mcp/systrace_mcp/remote_file_fetcher.py @@ -7,7 +7,15 @@ import os from stat import S_ISDIR import sys from failslow.util.constant import MODEL_CONFIG_PATH - +import logging +import sys +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) +logger = logging.getLogger("systrace_mcpserver") FTP_CONFIG_PATH = "/etc/systrace/config/ftp_config.json" class UnsupportedSyncTypeError(Exception): @@ -32,7 +40,7 @@ def load_config(config_file): # 验证配置结构 if "servers" not in config or not isinstance(config["servers"], list): - print("错误: 配置文件格式不正确,缺少servers数组") + logger.info("错误: 配置文件格式不正确,缺少servers数组") return None # 验证每个服务器配置的必要项 @@ -44,7 +52,7 @@ def load_config(config_file): for idx, server in enumerate(config["servers"]): for key in required_keys: if key not in server: - print(f"错误: 服务器配置 #{idx + 1} 缺少必要项 {key}") + logger.info(f"错误: 服务器配置 #{idx + 1} 缺少必要项 {key}") return None # 标准化远程目录路径 @@ -59,10 +67,10 @@ def load_config(config_file): return config["servers"] except json.JSONDecodeError: - print(f"错误: 配置文件 {config_file} 格式不正确") + logger.info(f"错误: 配置文件 {config_file} 格式不正确") return None except Exception as e: - print(f"加载配置文件时发生错误: {str(e)}") + logger.info(f"加载配置文件时发生错误: {str(e)}") return None @@ -71,7 +79,7 @@ def get_server_config(servers, target_ip): for server in servers: if server["ip"] == target_ip: return server - print(f"错误: 未找到IP为 {target_ip} 的服务器配置") + logger.info(f"错误: 未找到IP为 {target_ip} 的服务器配置") return None @@ -104,19 +112,19 @@ def init_local_dir(local_dir): try: if os.path.isfile(item_path) or os.path.islink(item_path): os.unlink(item_path) # 删除文件或软链接 - print(f"删除文件: {item_path}") + logger.info(f"删除文件: {item_path}") elif os.path.isdir(item_path): shutil.rmtree(item_path) # 删除子目录及其内容 - print(f"删除目录: {item_path}") + logger.info(f"删除目录: {item_path}") except Exception as e: - print(f"删除 {item_path} 失败: {e}") - print(f"已清空目录: {normalized_dir}") + logger.info(f"删除 {item_path} 失败: {e}") + logger.info(f"已清空目录: {normalized_dir}") else: - print(f"目录已存在且为空: {normalized_dir}") + logger.info(f"目录已存在且为空: {normalized_dir}") else: # 创建目录(包括必要的父目录) os.makedirs(normalized_dir, exist_ok=True) - print(f"创建本地根目录: {normalized_dir}") + logger.info(f"创建本地根目录: {normalized_dir}") return normalized_dir @@ -151,7 +159,7 @@ def get_remote_files_recursive(sftp, remote_base_dir, current_dir): return all_files except Exception as e: - print(f"获取远程文件列表失败 (目录: {current_dir}): {e}") + logger.info(f"获取远程文件列表失败 (目录: {current_dir}): {e}") return [] @@ -169,16 +177,16 @@ def download_new_files(sftp, remote_files, local_root): # 确保本地目录存在 if not os.path.exists(local_file_dir): os.makedirs(local_file_dir, exist_ok=True) - print(f"创建本地子目录: {local_file_dir}") + logger.info(f"创建本地子目录: {local_file_dir}") # 检查文件是否需要下载或更新 if not os.path.exists(local_file_path): - print(f"下载新文件: {file['remote_path']} -> {local_file_path}") + logger.info(f"下载新文件: {file['remote_path']} -> {local_file_path}") sftp.get(file["remote_path"], local_file_path) else: local_mtime = datetime.fromtimestamp(os.path.getmtime(local_file_path)) if file["mtime"] > local_mtime: - print(f"更新文件: {file['remote_path']} -> {local_file_path}") + logger.info(f"更新文件: {file['remote_path']} -> {local_file_path}") sftp.get(file["remote_path"], local_file_path) @@ -202,7 +210,7 @@ def getEnable_config(): # 解析配置文件的绝对路径 abs_config_path = os.path.abspath(FTP_CONFIG_PATH) if not os.path.exists(abs_config_path): - print(f"错误: 配置文件 {abs_config_path} 不存在") + logger.info(f"错误: 配置文件 {abs_config_path} 不存在") return None with open(abs_config_path, 'r') as f: @@ -210,10 +218,10 @@ def getEnable_config(): return config["enable"] except json.JSONDecodeError: - print(f"错误: 配置文件 {FTP_CONFIG_PATH} 格式不正确") + logger.info(f"错误: 配置文件 {FTP_CONFIG_PATH} 格式不正确") return None except Exception as e: - print(f"加载配置文件时发生错误: {str(e)}") + logger.info(f"加载配置文件时发生错误: {str(e)}") return None @@ -222,7 +230,7 @@ def sync_server_by_ip_and_type(target_ip, sync_type)->bool: # 判断一下同步功能是否开启 if getEnable_config() == 'False': return True - print(f"{datetime.now()} - 开始同步服务器 {target_ip} 的 {sync_type} 类型文件...") + logger.info(f"{datetime.now()} - 开始同步服务器 {target_ip} 的 {sync_type} 类型文件...") # 验证同步类型 if sync_type not in ["perception", "detection"]: @@ -244,7 +252,7 @@ def sync_server_by_ip_and_type(target_ip, sync_type)->bool: # 解析并初始化本地目录 resolved_local_dir = resolve_local_dir(local_dir) - print(f"使用本地目录: {resolved_local_dir}") + logger.info(f"使用本地目录: {resolved_local_dir}") local_root = init_local_dir(resolved_local_dir) # 建立SSH连接 @@ -268,7 +276,7 @@ def sync_server_by_ip_and_type(target_ip, sync_type)->bool: ) if remote_files: - print(f"发现 {len(remote_files)} 个远程文件(包括所有子目录),开始同步...") + logger.info(f"发现 {len(remote_files)} 个远程文件(包括所有子目录),开始同步...") download_new_files(sftp, remote_files, local_root) else: raise ValueError(f"未发现 {sync_type} 类型的远程文件") @@ -276,7 +284,7 @@ def sync_server_by_ip_and_type(target_ip, sync_type)->bool: # 关闭连接 sftp.close() ssh.close() - print(f"{datetime.now()} - 服务器 {target_ip} 的 {sync_type} 类型文件同步完成") + logger.info(f"{datetime.now()} - 服务器 {target_ip} 的 {sync_type} 类型文件同步完成") return True except Exception as e: -- Gitee