From 327a6506d84c2d146aa3af66aaa9976665132519 Mon Sep 17 00:00:00 2001 From: houxu Date: Thu, 7 Aug 2025 20:42:15 +0800 Subject: [PATCH] add mcp server --- failslow/mcp_server/README.md | 51 +++ failslow/mcp_server/__init__.py | 17 + failslow/mcp_server/config/ftp_config.json | 13 + failslow/mcp_server/config/openapi.yml | 36 +++ .../mcp_server/fail_slow_detection_api.py | 235 ++++++++++++++ failslow/mcp_server/mcp_data.py | 20 ++ failslow/mcp_server/mcp_server.py | 110 +++++++ failslow/mcp_server/openapi_server.py | 129 ++++++++ failslow/mcp_server/remote_file_fetcher.py | 292 ++++++++++++++++++ failslow/mcp_server/report_api.py | 80 +++++ .../service/systrace-mcpserver.service | 12 + failslow/mcp_server/setup.py | 46 +++ 12 files changed, 1041 insertions(+) create mode 100644 failslow/mcp_server/README.md create mode 100644 failslow/mcp_server/__init__.py create mode 100644 failslow/mcp_server/config/ftp_config.json create mode 100644 failslow/mcp_server/config/openapi.yml create mode 100644 failslow/mcp_server/fail_slow_detection_api.py create mode 100644 failslow/mcp_server/mcp_data.py create mode 100644 failslow/mcp_server/mcp_server.py create mode 100644 failslow/mcp_server/openapi_server.py create mode 100644 failslow/mcp_server/remote_file_fetcher.py create mode 100644 failslow/mcp_server/report_api.py create mode 100644 failslow/mcp_server/service/systrace-mcpserver.service create mode 100644 failslow/mcp_server/setup.py diff --git a/failslow/mcp_server/README.md b/failslow/mcp_server/README.md new file mode 100644 index 0000000..b590f8f --- /dev/null +++ b/failslow/mcp_server/README.md @@ -0,0 +1,51 @@ +# README + +# 安装部署 +## 前置条件 +支持的python版本:3.7+; +failslow 依赖于 systrace 采集的数据通信算子数据,请先完成 训练任务的 通信算子采集; + +failslow-mcpserver 支持本地或者远程获取远程目标服务器的systrace 采集的通信算子数据,需要在配置文件中指定通信算子数据的路径。 +failslow-openapi 支持本地或者远程获取远程目标服务器的systrace 采集的通信算子数据,需要在配置文件中指定通信算子数据的路径。 + +## 从本仓库源码安装运行(适用于开发者) +### 下载源码 + git clone https://gitee.com/openeuler/sysTrace.git +### 安装 failslow +工程./systrace目录下执行下面命令: +python3 setup.py install +### 运行 +systrace-failslow + +### 安装mcpserver +工程./systrace/mcp_server目录下执行下面命令: +python3 setup.py install +### 运行 +systrace-mcpserver #开启mcp server服务 服务端口为 12145 + +systrace-openapi #开启openapi server服务 服务端口 12146 + + +配置远程获取数据,修改./config/ftp_config.json文件 +~~~json +{ + "servers": [ + { + "ip": "192.168.122.196", #远程目标服务器的ip + "port": 22, #远程目标服务器的ssh端口 + "user": "root", #用户名 + "password": "Huawei12#$", #密码 + "perception_remote_dir": "/home/hx/sysTrace_dataloader/timeline", #远程目标服务器systrace采集的timeline数据保存路径 + "detection_remote_dir": "/home/hx/sysTrace_dataloader/mspti",#远程目标服务器systrace采集的mspti数据保存路径 + } + ], + "enable": "False" #True 为开启远程获取数据,False为关闭只使用本地文件进行分析 +} + +~~~ + + +### 数据分析 +**算子执行**:3ms左右,计算慢导致的异常时7-8ms +**算子下发**: 表示算子下发到算子开始执行的时间 600ms左右 +**通信慢**: sendrecv:几十ms到1200ms \ No newline at end of file diff --git a/failslow/mcp_server/__init__.py b/failslow/mcp_server/__init__.py new file mode 100644 index 0000000..9265555 --- /dev/null +++ b/failslow/mcp_server/__init__.py @@ -0,0 +1,17 @@ +# coding=utf-8 +""" +Copyright (c) Huawei Technologies Co., Ltd. 2020-2028. All rights reserved. +Description: +FileName:__init__.py.py +Author: h30077985/houxu +Create Date: 2025/7/8 19:24 +Notes: + +""" + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/failslow/mcp_server/config/ftp_config.json b/failslow/mcp_server/config/ftp_config.json new file mode 100644 index 0000000..31dfadb --- /dev/null +++ b/failslow/mcp_server/config/ftp_config.json @@ -0,0 +1,13 @@ +{ + "servers": [ + { + "ip": "192.168.159.130", + "port": 22, + "user": "root", + "password": "Huawei12#$", + "perception_remote_dir": "/home/sysTrace_dataloader/timeline", + "detection_remote_dir": "/home/sysTrace_dataloader/mspti" + } + ], + "enable": "False" +} diff --git a/failslow/mcp_server/config/openapi.yml b/failslow/mcp_server/config/openapi.yml new file mode 100644 index 0000000..0ffdf46 --- /dev/null +++ b/failslow/mcp_server/config/openapi.yml @@ -0,0 +1,36 @@ +openapi: 3.0.0 +info: + title: systrace运维接口 + description: 实现性能劣化感知、慢节点定界及报告生成,返回包含专业解读的详细分析报告 + version: 1.0.0 +servers: + - url: http://76.53.17.51:12140 + description: 性能检测服务地址 +paths: + /slow-node/systrace: + get: + summary: 性能劣化感知定界生成报告 + description: >- + 检测机器(服务器)是否发生劣化,并分析结果生成报告 + parameters: + - name: ip + in: query + required: true + schema: + type: string + example: "9.82.201.111" + description: 待采集指标的ip地址 + responses: + '200': + description: 返回的结果 + content: + application/json: + schema: + : '#/components/schemas/ApiResponse' +components: + schemas: + ApiResponse: + type: object + properties: + data: + type: object diff --git a/failslow/mcp_server/fail_slow_detection_api.py b/failslow/mcp_server/fail_slow_detection_api.py new file mode 100644 index 0000000..2d88d76 --- /dev/null +++ b/failslow/mcp_server/fail_slow_detection_api.py @@ -0,0 +1,235 @@ +''' +env +FAIL_SLOW_STOP: control fail slow stop +2025-07-21: add data match for L0 data covering + +''' +import re +import json +import os +import time +from datetime import datetime, timezone +import pandas as pd +from typing import Dict + +from failslow.util.constant import AnomalyType +from failslow.util.logging_utils import get_default_logger +from failslow.util.constant import MODEL_CONFIG_PATH +from failslow.dataloader.step_time_reader import StepReader + +logger = get_default_logger(__name__) + +DATA_QUEUE = pd.DataFrame({'time': [], 'step_time': []}) +DROP_DATA_LENGTH = 0 + + +def detect_step_time_anomalies(data_df: pd.DataFrame, model_args: Dict): + """ + 检测 step_time 序列中的异常值,并记录异常信息 + + :param step_times: step_time 序列 + :param window_size: 计算移动平均的窗口大小 + :param threshold: 异常判断的阈值,即当前值与移动平均的差值超过多少倍标准差认为是异常 + :return: 异常信息列表,每个元素为 (异常时刻索引, 异常程度) + """ + window_size = model_args.get("steps_window_size", 10) + k_sigma_threshold = model_args.get("k_sigma", 2) + anomaly_degree_thr = model_args.get("anomaly_degree_thr", 0.2) + anomalies = [] + step_times = data_df["step_time"] + timestamps = data_df["time"] + for i in range(len(step_times)): + if i < window_size: + continue + + moving_average = sum(step_times[i - window_size:i]) / window_size + + variance = sum((x - moving_average) ** 2 for x in step_times[i - window_size:i]) / window_size + std_dev = variance ** 0.5 + + current_anomaly = False + current_step_time = step_times[i] + + diff = current_step_time - moving_average + if diff > k_sigma_threshold * std_dev: + anomaly_degree = diff / moving_average + if anomaly_degree > anomaly_degree_thr: + current_anomaly = True + + if current_anomaly and i + 1 < len(step_times): + next_step_time = step_times[i + 1] + next_diff = next_step_time - moving_average + if next_diff > k_sigma_threshold * std_dev: + next_anomaly_degree = next_diff / anomaly_degree_thr + if next_anomaly_degree > anomaly_degree_thr: + anomalies.append( + {"training_step": i, + "anomaly_time": datetime.fromtimestamp(timestamps[i]/1000).strftime('%Y-%m-%d %H:%M:%S'), + "anomaly_degree": round(anomaly_degree, 3), + "anomaly_training_time": f"{current_step_time}ms", + "normal_training_time": f"{moving_average}ms"}) + + anomaly_info = {} + if anomalies: + anomaly_info["is_anomaly"] = True + anomaly_info["anomaly_count_times"] = len(anomalies) + anomaly_info["anomaly_info"] = anomalies + anomaly_info["anomaly_type"] = AnomalyType.fail_slow + else: + anomaly_info["is_anomaly"] = False + anomaly_info["anomaly_count_times"] = 0 + anomaly_info["anomaly_info"] = [] + anomaly_info["anomaly_type"] = AnomalyType.normal + anomaly_info["start_time"] = int(timestamps.iloc[0]) + anomaly_info["end_time"] = int(timestamps.iloc[len(timestamps) - 1]) + return anomaly_info + + +def write_anomaly_info(anomaly_info: Dict, fail_slow_perception_path: str, file_ext: str = ".json"): + now_time = datetime.now(timezone.utc).astimezone().astimezone() + now_timestamp = int(now_time.timestamp()) + anomaly_type = anomaly_info.get("anomaly_type", AnomalyType.fail_slow) + fail_slow_perception_path = os.path.join(fail_slow_perception_path, + f"fail_slow_perception_result_{anomaly_type}_{now_timestamp}{file_ext}") + + try: + with open(fail_slow_perception_path, 'w', encoding='utf-8') as json_file: + json.dump(anomaly_info, json_file, ensure_ascii=False, indent=4) + logger.info(f"anomaly info {anomaly_info}") + logger.info(f"writing result to {fail_slow_perception_path}") + except Exception as e: + logger.error(f"writing result fail: {e}") + + +def get_extract_func_str(log_type: str): + extrct_func_dict = { + "timeline": "get_step_data_from_timeline", + "log": "get_step_data_from_training_log", + } + + return extrct_func_dict.get(log_type, None) + + +def update_queue_data(data: pd.DataFrame, max_data_queue_steps: int): + global DATA_QUEUE + global DROP_DATA_LENGTH + if len(DATA_QUEUE) > 0: + history_data_length = len(DATA_QUEUE) + DATA_QUEUE = pd.concat([DATA_QUEUE, data], axis=0, ignore_index=True) + if len(DATA_QUEUE) > max_data_queue_steps: + start_data_length = min(len(data), history_data_length) + DROP_DATA_LENGTH += start_data_length + DATA_QUEUE = DATA_QUEUE[start_data_length:].reset_index(drop=True) + else: + DATA_QUEUE = data + + +def check_input_file(training_log: str, log_type: str): + if log_type == "timeline": + if os.path.isfile(training_log): + return training_log + else: + dir_path = os.path.dirname(training_log) + for file in os.listdir(dir_path): + if file.endswith("00000.timeline"): + return os.path.join(dir_path, file) + return training_log + + +def run_slow_node_perception(args: Dict, task_id: str): + training_log = args.get("training_log", "./log/rank0_mindformer.log") + fail_slow_perception_result = args.get("fail_slow_perception_path", "/log") + os.makedirs(fail_slow_perception_result, exist_ok=True) + log_type = args.get("log_type", "timeline") + training_log = check_input_file(training_log, log_type) + + task_stable_step = args.get("task_stable_step", 2) # just for first time detection + fail_slow_span_mins = args.get("fail_slow_span_mins", 0.1) # for detection interval + max_data_queue_steps = args.get("max_data_queue_steps", 100) + min_statup_detection_steps = args.get("min_startup_detection_steps", 10) + hang_times_mins_thr = args.get("hang_times_mins_thr", 0.5) + + detecting_range_steps = [0, 0] + first_flag = False + hang_info = [] + hang_time_stamp = [] + next_detection_timestamp = None + timer_flag = False + + step_reader = StepReader() + log_extract_func = getattr(step_reader, get_extract_func_str(log_type)) + + if timer_flag: + time.sleep(fail_slow_span_mins * 60) + timer_flag = True + + data = log_extract_func(training_log) + update_queue_data(data, max_data_queue_steps) + training_steps = len(DATA_QUEUE) + if not training_steps: + logger.info(f"training data is empty.") + return "raining data is empty." + + # if data not training, record not training times + # remove model init process + if training_steps == (detecting_range_steps[1]) and detecting_range_steps[1] and (not step_reader.is_update): + logger.info("start hang detection") + now_time = datetime.now(timezone.utc).astimezone() + now_time_stamp = now_time.timestamp() + format_str = '%Y-%m-%d %H:%M:%S %z' + now_time_str = now_time.strftime(format_str) + hang_time_stamp.append(now_time_stamp) + hang_info.append(now_time_str) + hang_times = round((now_time_stamp - hang_time_stamp[0]) / 60, 2) + logger.info(f"hang time min: {hang_times}") + if hang_time_stamp and hang_times > hang_times_mins_thr: + # record hang + anomaly_info = { + "is_anomaly": True, + "anomaly_count_times": 1, + "anomaly_info": [{ + "detect_point": hang_info, + "hang_minutes": hang_times + }], + "anomaly_type": AnomalyType.hang, + "start_time": int(hang_time_stamp[0] * 1000), + "end_time": int(hang_time_stamp[1] * 1000) + } + logger.info(f"hang detection find training process is hang at: {hang_info[0]}") + write_anomaly_info(anomaly_info, fail_slow_perception_result) + return f"hang detection find training process is hang at: {hang_info[0]}" + else: + hang_info = [] + hang_time_stamp = [] + + new_detecting_range_steps = [0, 0] + new_detecting_range_steps[1] = training_steps + DROP_DATA_LENGTH + if not detecting_range_steps[1]: + first_flag = True + else: + if first_flag: + # second time detect, start not change + new_detecting_range_steps[0] = detecting_range_steps[0] + else: + # main update start + new_detecting_range_steps[0] = (detecting_range_steps[0] + detecting_range_steps[1]) // 2 + first_flag = False + range_steps = new_detecting_range_steps[1] - new_detecting_range_steps[0] + if range_steps < min_statup_detection_steps: + logger.warning( + f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}.") + return f"[Warning] detecting range step {range_steps} should larger than {min_statup_detection_steps}." + + detecting_range_steps = new_detecting_range_steps + if first_flag: + detecting_range_steps[0] = task_stable_step + logger.info( + f"Detection data range: {detecting_range_steps}, data queue: {len(DATA_QUEUE)}, drop data length: {DROP_DATA_LENGTH}.") + detected_data = DATA_QUEUE.loc[(detecting_range_steps[0] - DROP_DATA_LENGTH): ( + detecting_range_steps[1] - DROP_DATA_LENGTH)].reset_index(drop=True) + anomaly_info = detect_step_time_anomalies(detected_data, args) + + write_anomaly_info(anomaly_info, fail_slow_perception_result) + fail_slow_stop_flag = os.getenv('FAIL_SLOW_STOP', 'False').lower() == "true" + anomaly_info["task_id"] = task_id + return anomaly_info diff --git a/failslow/mcp_server/mcp_data.py b/failslow/mcp_server/mcp_data.py new file mode 100644 index 0000000..a1cadb5 --- /dev/null +++ b/failslow/mcp_server/mcp_data.py @@ -0,0 +1,20 @@ +from typing_extensions import TypedDict, List +from dataclasses import dataclass, field +from typing import List, Dict, Any + +class AnomalyInfo(TypedDict): + """劣化详细信息结构""" + metric_name: str #是否发生性能劣化 + threshold: float + actual_value: float + timestamp: int + +class PerceptionResult(TypedDict): + """慢节点感知结果结构""" + is_anomaly: bool #是否发生性能劣化 + anomaly_count_times: int #劣化次数 + anomaly_info: List[AnomalyInfo] #劣化详细信息 + start_time: int # Unix timestamp in milliseconds 劣化开始时间 + end_time: int # Unix timestamp in milliseconds 劣化结束时间 + anomaly_type: str # 劣化类型 + task_id: str #服务ip diff --git a/failslow/mcp_server/mcp_server.py b/failslow/mcp_server/mcp_server.py new file mode 100644 index 0000000..5725021 --- /dev/null +++ b/failslow/mcp_server/mcp_server.py @@ -0,0 +1,110 @@ +from typing import Union + +import json + +from mcp.server import FastMCP + +from failslow.response.response import AIJobDetectResult +from failslow.util.logging_utils import get_default_logger +from failslow.util.constant import MODEL_CONFIG_PATH +from failslow.main import main as slow_node_detection_api + +from mcp_server.report_api import generate_normal_report, generate_degraded_report, generate_default_report +from mcp_server.mcp_data import PerceptionResult +from mcp_server.fail_slow_detection_api import run_slow_node_perception +from mcp_server.remote_file_fetcher import sync_server_by_ip_and_type + +logger = get_default_logger(__name__) +# 仅在 Linux 环境下强制使用 spawn 方式 +import multiprocessing +import os +if os.name == "posix": # posix 表示 Linux/macOS + multiprocessing.set_start_method("spawn", force=True) +# 创建MCP Server +mcp = FastMCP("SysTrace MCP Server", host="0.0.0.0", port=12145) + + +@mcp.prompt(title="self_introduction", + description="工具定位") +def self_introduction() -> str: + return "面向运维、开发人员,支持自然语言对接,实现启发式调优,实现3个工具接口,分别为性能劣化感知工具,慢卡定界工具,报告输出工具。" + + +@mcp.prompt(title="劣化感知", + description="调用逻辑:1. 当用户询问特定任务ID的机器性能是否劣化时调用。2. 检测结果将决定后续流程走向。\ + 3. 调用完成后如果出现劣化现象,则把当前工具得到的结果作为入参,调用slow_node_detection_tool方法 ,如果没有出现劣化现象,则调用报告工具返回报告给用户。\ + 4. 本方法得到的结果必须再调用generate_report 生成报告给到用户" + ) +@mcp.tool( + name="slow_node_perception_tool", + title="检测指定任务ID的机器性能是否发生劣化工具" +) +def slow_node_perception_tool(task_id: str) -> PerceptionResult: + """ + 这是检测指定task_id的机器性能是否发生劣化的工具; + 入参 task_id ,如 192.168.2.122; + 返回 PerceptionResult 如果is_anomaly=false,该结果需要调用generate_report_tool再返回给用户;如果is_anomaly=True,该结果必须调用slow_node_detection_tool得到报告 + """ + print("性能劣化感知工具 开启") + print("task_id = " + task_id) + + with open(MODEL_CONFIG_PATH, 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + sync_server_by_ip_and_type(task_id, "perception") + res = run_slow_node_perception(model_args,task_id) + return res + + +@mcp.prompt(title="慢卡定界", + description="调用逻辑:1. 仅在感知工具返回is_anomaly=True时调用。2. 接收感知工具的全量性能数据作为输入。 3. 本方法得到的结果必须再调用generate_report 生成报告给到用户") +@mcp.tool(name="slow_node_detection_tool", + title="检查工具:当检测到性能劣化时诊断具体问题点,即定界工具") +def slow_node_detection_tool(performance_data: PerceptionResult) -> AIJobDetectResult: + """ + 这是针对slow_node_perception_tool工具返回is_anomaly=True时调用的慢卡定界工具 + 输入: + performance_data: 感知工具返回的完整性能数据PerceptionResult; + 输出:AIJobDetectResult,该结果必须要调用generate_report_tool得到报告再返回给用户 + """ + print("慢卡定界工具") + print("performance_data = " + str(performance_data)) + sync_server_by_ip_and_type(performance_data["task_id"], "detection") + _res = slow_node_detection_api() + print(json.dumps(_res)) + return _res + + +@mcp.prompt(title="报告生成工具", + description="调用slow_node_perception_tool 或 slow_node_detection_tool 后把结果传入generate_report ") +@mcp.tool() +def generate_report_tool(source_data: Union[dict, str], report_type: str) -> dict: + """ + 使用 报告工具:生成最终Markdown格式报告 + 输入: + source_data 感知或定界的结果 + report_type 是否劣化 normal anomaly + 您是一个专业的性能劣化分析人员,擅长分析服务器运行健康状态,生成报告,报告标题“AI训练任务性能诊断报告”。一下内容如实回答,不要发散。注意训练任务使用的是NPU卡,不是GPU,不要带有GPU相关字眼。未劣化时,直接给结论,不要发散,没有cpu、磁盘等指标之类的字眼。 + 当前时间:{{ time }},可以作为时间参照。 + 先判断是否性能劣化,{report_type}为normal 未劣化,anomaly 劣化; + 未劣化分析步骤如下: + 1、总览:根据里的{start_time}{end_time}得到开始和结束时间,结论是当前AI训练任务运行正常,将持续监测。 + 劣化分析步骤如下: + 1、总览:根据里的{time}得到检测时间,{abnormalNodeCount}异常节点数量,{compute}{network}{storage}异常类型true为异常,false正常; + 2、细节:每条节点的具体卡号{objectId}、异常指标{kpiId}(其中:HcclAllGather表示集合通信库的AllGather时序序列指标;HcclReduceScatter表示集合通信库的ReduceScatter时序序列指标;HcclAllReduce表示集合通信库的AllReduce时序序列指标;),检测方法{methodType}(SPACE 多节点空间对比检测器,TIME 单节点时间检测器),以表格形式呈现; + 3、针对这个节点给出检测建议,如果是计算类型,建议检测卡的状态,算子下发以及算子执行的代码,对慢节点进行隔离;如果是网络问题,建议检测组网的状态,使用压测节点之间的连通状态;如果是存储问题,建议检测存储的磁盘以及用户脚本中的dataloader和保存模型代码。 + """ + print("调用了报告工具,report_type = " + report_type) + # 根据报告类型调用对应的生成方法 + if report_type == "normal": + return json.dumps(generate_normal_report(source_data)) + elif report_type == "anomaly": + return json.dumps(generate_degraded_report(source_data)) + else: + # 默认报告类型 + return generate_default_report(source_data) + +def main(): + # 初始化并启动服务 + mcp.run(transport='sse') +if __name__ == "__main__": + main() diff --git a/failslow/mcp_server/openapi_server.py b/failslow/mcp_server/openapi_server.py new file mode 100644 index 0000000..ea5b154 --- /dev/null +++ b/failslow/mcp_server/openapi_server.py @@ -0,0 +1,129 @@ +import json +import os +from time import sleep +from typing import Union, Dict, Any, Optional +from pydantic import BaseModel +import uvicorn +from fastapi import FastAPI, HTTPException, Query +from fastapi.middleware.cors import CORSMiddleware + +from failslow.response.response import AIJobDetectResult +from failslow.main import main as slow_node_detection_api +from failslow.util.logging_utils import get_default_logger +from failslow.util.constant import MODEL_CONFIG_PATH +from mcp_server.mcp_data import PerceptionResult +from mcp_server.fail_slow_detection_api import run_slow_node_perception +from mcp_server.remote_file_fetcher import sync_server_by_ip_and_type +from mcp_server.report_api import generate_normal_report, generate_degraded_report, generate_default_report +# 仅在 Linux 环境下强制使用 spawn 方式 +import multiprocessing + +if os.name == "posix": # posix 表示 Linux/macOS + multiprocessing.set_start_method("spawn", force=True) +# 初始化日志 +logger = get_default_logger(__name__) + +# 创建FastAPI应用 +app = FastAPI(title="systrace运维接口", version="1.0.0") + +# 配置CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# 数据模型定义 +class ApiResponse(BaseModel): + """通用API响应模型""" + data: Optional[Any] = None + code: int = 200 + message: str = "success" + + +# 工具实现 +def slow_node_perception_tool(task_id: str) -> PerceptionResult: + """检测指定task_id的机器性能是否发生劣化的工具""" + logger.info(f"性能劣化感知工具开启,task_id = {task_id}") + + try: + with open(MODEL_CONFIG_PATH, 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + except Exception as e: + logger.error(f"加载模型配置失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"加载模型配置失败: {str(e)}") + + try: + sync_server_by_ip_and_type(task_id, "perception") + res = run_slow_node_perception(model_args,task_id) + res["task_id"] = task_id + logger.info(f"性能感知结果: {str(res)}") + return res + except Exception as e: + logger.error(f"性能劣化感知工具出错: {str(e)}") + raise HTTPException(status_code=500, detail=f"性能劣化感知工具出错: {str(e)}") + + +def slow_node_detection_tool(performance_data: PerceptionResult) -> AIJobDetectResult: + """针对性能劣化进行具体问题点诊断的工具""" + logger.info(f"慢卡定界工具开启,performance_data = {str(performance_data)}") + + try: + sync_server_by_ip_and_type(performance_data["task_id"], "detection") + _res = slow_node_detection_api(performance_data) + logger.info(f"慢卡定界结果: {json.dumps(_res)}") + return _res + except Exception as e: + logger.error(f"慢卡定界工具出错: {str(e)}") + raise HTTPException(status_code=500, detail=f"慢卡定界工具出错: {str(e)}") + + +def generate_report_tool(source_data: Union[dict, str], report_type: str) -> Union[str, Dict[str, Any]]: + """生成最终报告的工具""" + logger.info(f"调用报告工具,report_type = {report_type}") + + try: + if report_type == "normal": + report_content = generate_normal_report(source_data) + elif report_type == "anomaly": + report_content = generate_degraded_report(source_data) + else: + report_content = generate_default_report(source_data) + + return report_content + except Exception as e: + logger.error(f"报告生成工具出错: {str(e)}") + raise HTTPException(status_code=500, detail=f"报告生成工具出错: {str(e)}") + + +@app.get("/slow-node/systrace", response_model=ApiResponse) +async def slow_node_perception(ip: str = Query("127.0.0.1", description="节点IP地址")): + """ + systrace运维接口 + """ + result = slow_node_perception_tool(ip) + # 判断是否劣化 + report_type = "anomaly" if result.get("is_anomaly", True) else "normal" + if True is result["is_anomaly"]: + result = slow_node_detection_tool(result) + # 3. 自动调用报告生成 + report_content = generate_report_tool(result, report_type) + + # 4. 返回结果(包含感知结果和报告) + return ApiResponse(data={ + "report": report_content, + "report_type": report_type + }) + + +def main(): + """启动服务""" + logger.info("启动性能调优数据采集接口服务...") + uvicorn.run(app, host="0.0.0.0", port=12146) + + +if __name__ == "__main__": + main() diff --git a/failslow/mcp_server/remote_file_fetcher.py b/failslow/mcp_server/remote_file_fetcher.py new file mode 100644 index 0000000..e20e669 --- /dev/null +++ b/failslow/mcp_server/remote_file_fetcher.py @@ -0,0 +1,292 @@ +import shutil + +import paramiko +from datetime import datetime +import json +import os +from stat import S_ISDIR +import sys +from failslow.util.constant import MODEL_CONFIG_PATH + +FTP_CONFIG_PATH = "/etc/systrace/config/ftp_config.json" + +class UnsupportedSyncTypeError(Exception): + """自定义异常类,用于处理不支持的同步类型""" + + def __init__(self, sync_type): + self.sync_type = sync_type + super().__init__(f"不支持的同步类型: {sync_type},仅支持 'perception' 和 'detection'") + + +def load_config(config_file): + """加载并解析多服务器JSON配置文件""" + try: + # 解析配置文件的绝对路径 + abs_config_path = os.path.abspath(config_file) + if not os.path.exists(abs_config_path): + print(f"错误: 配置文件 {abs_config_path} 不存在") + return None + + with open(abs_config_path, 'r') as f: + config = json.load(f) + + # 验证配置结构 + if "servers" not in config or not isinstance(config["servers"], list): + print("错误: 配置文件格式不正确,缺少servers数组") + return None + + # 验证每个服务器配置的必要项 + required_keys = [ + "ip", "port", "user", "password", + "perception_remote_dir", "detection_remote_dir" + ] + + for idx, server in enumerate(config["servers"]): + for key in required_keys: + if key not in server: + print(f"错误: 服务器配置 #{idx + 1} 缺少必要项 {key}") + return None + + # 标准化远程目录路径 + server["perception_remote_dir"] = server["perception_remote_dir"].replace("\\", "/") + if not server["perception_remote_dir"].endswith("/"): + server["perception_remote_dir"] += "/" + + server["detection_remote_dir"] = server["detection_remote_dir"].replace("\\", "/") + if not server["detection_remote_dir"].endswith("/"): + server["detection_remote_dir"] += "/" + + return config["servers"] + + except json.JSONDecodeError: + print(f"错误: 配置文件 {config_file} 格式不正确") + return None + except Exception as e: + print(f"加载配置文件时发生错误: {str(e)}") + return None + + +def get_server_config(servers, target_ip): + """根据IP查找对应的服务器配置""" + for server in servers: + if server["ip"] == target_ip: + return server + print(f"错误: 未找到IP为 {target_ip} 的服务器配置") + return None + + +def resolve_local_dir(local_dir): + """ + 解析本地目录路径,将相对路径转换为绝对路径 + 相对路径是相对于当前脚本的执行目录,而不是配置文件的位置 + """ + # 如果是绝对路径,直接返回 + if os.path.isabs(local_dir): + return local_dir + + # 如果是相对路径,基于当前工作目录解析 + return os.path.abspath(os.path.join(os.getcwd(), local_dir)) + + +def init_local_dir(local_dir): + """初始化本地目录,确保目录存在且为空""" + # 解析路径(处理相对路径) + resolved_dir = resolve_local_dir(local_dir) # 假设resolve_local_dir已实现路径解析功能 + normalized_dir = os.path.normpath(resolved_dir) + + # 如果目录存在 + if os.path.exists(normalized_dir): + # 检查目录是否为空 + if os.listdir(normalized_dir): + # 遍历目录内容并删除 + for item in os.listdir(normalized_dir): + item_path = os.path.join(normalized_dir, item) + try: + if os.path.isfile(item_path) or os.path.islink(item_path): + os.unlink(item_path) # 删除文件或软链接 + print(f"删除文件: {item_path}") + elif os.path.isdir(item_path): + shutil.rmtree(item_path) # 删除子目录及其内容 + print(f"删除目录: {item_path}") + except Exception as e: + print(f"删除 {item_path} 失败: {e}") + print(f"已清空目录: {normalized_dir}") + else: + print(f"目录已存在且为空: {normalized_dir}") + else: + # 创建目录(包括必要的父目录) + os.makedirs(normalized_dir, exist_ok=True) + print(f"创建本地根目录: {normalized_dir}") + + return normalized_dir + + +def get_remote_files_recursive(sftp, remote_base_dir, current_dir): + """递归获取远程目录所有文件""" + all_files = [] + + try: + current_dir = current_dir.replace("\\", "/") + if not current_dir.endswith("/"): + current_dir += "/" + + for entry in sftp.listdir_attr(current_dir): + entry_remote_path = f"{current_dir}{entry.filename}" + + if entry.filename.startswith('.'): + continue + + if S_ISDIR(entry.st_mode): + subdir_files = get_remote_files_recursive(sftp, remote_base_dir, entry_remote_path) + all_files.extend(subdir_files) + else: + relative_path = entry_remote_path[len(remote_base_dir):] + all_files.append({ + "name": entry.filename, + "remote_path": entry_remote_path, + "relative_path": relative_path, + "mtime": datetime.fromtimestamp(entry.st_mtime) + }) + + return all_files + + except Exception as e: + print(f"获取远程文件列表失败 (目录: {current_dir}): {e}") + return [] + + +def download_new_files(sftp, remote_files, local_root): + """下载新增或更新的文件,保持目录结构""" + # 确保本地根目录已正确解析 + resolved_local_root = resolve_local_dir(local_root) + + for file in remote_files: + local_relative_path = file["relative_path"].replace("/", os.sep) + # 构建完整本地路径 + local_file_path = os.path.normpath(os.path.join(resolved_local_root, local_relative_path)) + local_file_dir = os.path.dirname(local_file_path) + + # 确保本地目录存在 + if not os.path.exists(local_file_dir): + os.makedirs(local_file_dir, exist_ok=True) + print(f"创建本地子目录: {local_file_dir}") + + # 检查文件是否需要下载或更新 + if not os.path.exists(local_file_path): + print(f"下载新文件: {file['remote_path']} -> {local_file_path}") + sftp.get(file["remote_path"], local_file_path) + else: + local_mtime = datetime.fromtimestamp(os.path.getmtime(local_file_path)) + if file["mtime"] > local_mtime: + print(f"更新文件: {file['remote_path']} -> {local_file_path}") + sftp.get(file["remote_path"], local_file_path) + + +def getServer_config(target_ip): + # 加载配置 + servers = load_config(FTP_CONFIG_PATH) + if not servers: + return False + + # 获取目标服务器配置 + server_config = get_server_config(servers, target_ip) + if not server_config: + return False + return server_config + + +def getEnable_config(): + # 加载配置 + """加载并解析多服务器JSON配置文件""" + try: + # 解析配置文件的绝对路径 + abs_config_path = os.path.abspath(FTP_CONFIG_PATH) + if not os.path.exists(abs_config_path): + print(f"错误: 配置文件 {abs_config_path} 不存在") + return None + + with open(abs_config_path, 'r') as f: + config = json.load(f) + return config["enable"] + + except json.JSONDecodeError: + print(f"错误: 配置文件 {FTP_CONFIG_PATH} 格式不正确") + return None + except Exception as e: + print(f"加载配置文件时发生错误: {str(e)}") + return None + + +def sync_server_by_ip_and_type(target_ip, sync_type)->bool: + """根据IP和同步类型同步指定服务器的文件""" + # 判断一下同步功能是否开启 + if getEnable_config() == 'False': + return True + print(f"{datetime.now()} - 开始同步服务器 {target_ip} 的 {sync_type} 类型文件...") + + # 验证同步类型 + if sync_type not in ["perception", "detection"]: + try: + raise UnsupportedSyncTypeError(sync_type) + except UnsupportedSyncTypeError as e: + raise ValueError(f"同步数据类型异常") + server_config = getServer_config(target_ip) + with open(MODEL_CONFIG_PATH, 'r', encoding='utf-8') as reader: + model_args = json.load(reader) + try: + # 根据同步类型选择对应的目录 + if sync_type == "perception": + remote_dir = server_config["perception_remote_dir"] + local_dir = os.path.dirname(model_args["training_log"]) + else: # detection + remote_dir = server_config["detection_remote_dir"] + local_dir = model_args["root_path"] + + # 解析并初始化本地目录 + resolved_local_dir = resolve_local_dir(local_dir) + print(f"使用本地目录: {resolved_local_dir}") + local_root = init_local_dir(resolved_local_dir) + + # 建立SSH连接 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect( + server_config["ip"], + server_config["port"], + server_config["user"], + server_config["password"] + ) + + # 创建SFTP客户端 + sftp = ssh.open_sftp() + + # 递归获取所有远程文件 + remote_files = get_remote_files_recursive( + sftp, + remote_dir, # 远程根目录 + remote_dir # 初始当前目录 + ) + + if remote_files: + print(f"发现 {len(remote_files)} 个远程文件(包括所有子目录),开始同步...") + download_new_files(sftp, remote_files, local_root) + else: + raise ValueError(f"未发现 {sync_type} 类型的远程文件") + + # 关闭连接 + sftp.close() + ssh.close() + print(f"{datetime.now()} - 服务器 {target_ip} 的 {sync_type} 类型文件同步完成") + return True + + except Exception as e: + raise ValueError(f"获取远程服务器systrace采集数据异常: {e}") + + +# 使用示例 +if __name__ == "__main__": + # 同步detection类型,会使用配置中的"detection_local_dir" + sync_server_by_ip_and_type("9.13.100.7", "perception") + + # 同步perception类型 + # sync_server_by_ip_and_type("76.53.17.51", "perception") diff --git a/failslow/mcp_server/report_api.py b/failslow/mcp_server/report_api.py new file mode 100644 index 0000000..abe1dba --- /dev/null +++ b/failslow/mcp_server/report_api.py @@ -0,0 +1,80 @@ +import json +from datetime import datetime + + +def generate_normal_report(data: dict) -> dict: + """生成无劣化的正常报告""" + # 解析时间戳为可读格式 + timestamp = data.get("start_time") + start_time = datetime.fromtimestamp(timestamp // 1000).strftime("%Y-%m-%d %H:%M:%S") if timestamp else "未知时间" + timestamp = data.get("end_time") + end_time = datetime.fromtimestamp(timestamp // 1000).strftime("%Y-%m-%d %H:%M:%S") if timestamp else "未知时间" + data["start_time"] = start_time + data["end_time"] = end_time + + return data + + +def generate_degraded_report(data: dict) -> dict: + """ + 生成设备异常状态的JSON报告 + + 参数: + data: 包含设备状态信息的字典 + + 返回: + 格式化的JSON报告字典 + """ + # 解析时间戳为可读格式 + timestamp = data.get("timestamp") + detect_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") if timestamp else "未知时间" + # 提取异常信息 + abnormalDetail = data.get("abnormalDetail", []) + abnormal_count = len(abnormalDetail) + + + # 整理异常节点详情 + abnormal_nodes = [] + for abnormal in abnormalDetail: + abnormal_nodes.append({ + "objectId": abnormal.get("objectId"), + "serverIp": abnormal.get("serverIp"), + "deviceInfo": abnormal.get("deviceInfo"), + "methodType": abnormal.get("methodType"), + "kpiId":abnormal.get("kpiId"), + "relaIds": abnormal.get("relaIds", []) + }) + + # 整理正常节点信息 + normal_nodes = [item["deviceInfo"] for item in data.get("normalDetail", [])] + + # 构建JSON报告 + report = { + "reportName": "AI训练任务性能诊断报告", + "overview": { + "detectTime": detect_time, + "abnormalNodeCount": abnormal_count, + "compute": data.get("compute") , + "network": data.get("network") , + "storage": data.get("storage") , + }, + "abnormalNodes": abnormal_nodes, + "normalNodes": { + "count": len(normal_nodes), + "devices": normal_nodes + }, + "errorMessage": data.get("errorMsg", "") + } + + return report + + +def generate_default_report(data: dict) -> dict: + """生成默认报告(当类型不匹配时),返回JSON格式字典""" + return { + "report_title": "机器性能分析报告", + "warning": "报告类型未识别,以下是原始数据摘要", + "raw_data": data, + "report_type": "default" + } + diff --git a/failslow/mcp_server/service/systrace-mcpserver.service b/failslow/mcp_server/service/systrace-mcpserver.service new file mode 100644 index 0000000..8964806 --- /dev/null +++ b/failslow/mcp_server/service/systrace-mcpserver.service @@ -0,0 +1,12 @@ +[Unit] +Description=A-Ops fail slow detection mcp service +After=network.target + +[Service] +Type=exec +ExecStart=/usr/bin/systrac-mcpserver +Restart=on-failure +RestartSec=1 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/failslow/mcp_server/setup.py b/failslow/mcp_server/setup.py new file mode 100644 index 0000000..0fef521 --- /dev/null +++ b/failslow/mcp_server/setup.py @@ -0,0 +1,46 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) 2022 Huawei Technologies Co., Ltd. +# gala-anteater is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ + +from glob import glob + +from setuptools import setup, find_packages +import os + +ser = "/usr/lib/systemd/system/systrac-mcpserver.service" +if os.path.isfile(ser): + os.remove(ser) +setup( + name="systrace_mcpserver", + version="1.0.0", + author="xu hou", + author_email="houxu5@h-partners.com", + description="MCP Server for SystraceFail Slow Detection for AI Model Training and Inference", + url="https://gitee.com/openeuler/sysTrace", + keywords=["Fail Slow Detection", "Group Compare", "AI Model", "MCP Server"], + packages=find_packages(where=".", exclude=("tests", "tests.*")), + data_files=[ + ('/etc/systrace/config/', glob('config/ftp_config.json')), + ('/usr/lib/systemd/system/', glob('service/*')), + ], + install_requires=[ + "systrace_failslow", + "mcp==1.10.1", + "paramiko" + ], + entry_points={ + "console_scripts": [ + "systrace-mcpserver=mcp_server.mcp_server:main", + "systrace-openapi=mcp_server.openapi_server:main" + ] + } +) \ No newline at end of file -- Gitee