diff --git a/distribution/linux/euler-copilot-shell.spec b/distribution/linux/euler-copilot-shell.spec index 086e82deb12979900aac79d1ab39438446366836..5a2a83e8aff456000f0835d541f4f265f29858c1 100644 --- a/distribution/linux/euler-copilot-shell.spec +++ b/distribution/linux/euler-copilot-shell.spec @@ -34,6 +34,8 @@ openEuler Intelligence 智能 Shell 是一个智能命令行程序。 # 部署安装工具子包 %package -n openeuler-intelligence-installer Summary: openEuler Intelligence 部署安装脚本 +Requires: python3-aiohttp +Requires: python3-requests BuildArch: noarch %description -n openeuler-intelligence-installer diff --git a/scripts/deploy/1-check-env/check_env.sh b/scripts/deploy/1-check-env/check_env.sh index 718ea4b44fc6b63a27b08c26a41c4c3e0eb85519..021034e7837e7f14e974781c127ec1fd92a6e72b 100644 --- a/scripts/deploy/1-check-env/check_env.sh +++ b/scripts/deploy/1-check-env/check_env.sh @@ -181,8 +181,8 @@ function check_user { function check_version { local current_version_id="$1" - local supported_versions=("${@:2}") - local sp="$3" + local sp="$2" + local supported_versions=("${@:3}") echo -e "${COLOR_INFO}[Info] 当前操作系统版本为:$current_version_id LTS-$sp${COLOR_RESET}" for version_id in "${supported_versions[@]}"; do if [[ "$current_version_id" == "$version_id" ]]; then @@ -226,7 +226,7 @@ function check_os_version { case $id in "openEuler") local supported_versions=("22.03" "24.03" "25.03" "25.09") - check_version "$version" "${supported_versions[@]}" "$sp" + check_version "$version" "$sp" "${supported_versions[@]}" ;; "hce") echo -e "${COLOR_INFO}[Info] 检测到 HCE 发行版,跳过版本检查${COLOR_RESET}" diff --git a/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh b/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh index 761f4e0a77b96aca40c31098a0ab6fd8ea884b69..e08ded032aa7d174579989d45af3b555a0b7b847 100644 --- a/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh +++ b/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh @@ -526,49 +526,13 @@ check_pip_framework() { # 根据 Python 版本选择包列表 declare -A REQUIRED_PACKAGES if [[ "$python_version" =~ ^3\.(11|[2-9][0-9])$ ]]; then - # Python 3.11 或更新版本,使用当前列表 + # Python 3.11 或更新版本,检查并安装 DNF 源中缺失的 pip 依赖 REQUIRED_PACKAGES=( ["pymongo"]="" - ["requests"]="" ["pydantic"]="" - ["aiohttp"]="" ) elif [[ "$python_version" =~ ^3\.(9|10)$ ]]; then - # Python 3.9 或 3.10,使用完整列表 - REQUIRED_PACKAGES=( - ["requests"]="" - ["aiohttp"]="" - ["aiofiles"]="24.1.0" - ["asyncer"]="0.0.8" - ["asyncpg"]="0.30.0" - ["cryptography"]="44.0.2" - ["fastapi"]="0.115.12" - ["httpx"]="0.28.1" - ["httpx-sse"]="0.4.0" - ["jinja2"]="3.1.6" - ["jionlp"]="1.5.20" - ["jsonschema"]="4.23.0" - ["lancedb"]="0.21.2" - ["minio"]="7.2.15" - ["ollama"]="0.5.1" - ["openai"]="1.91.0" - ["pandas"]="2.2.3" - ["pgvector"]="0.4.1" - ["pillow"]="10.3.0" - ["pydantic"]="2.11.7" - ["pymongo"]="4.12.1" - ["python-jsonpath"]="1.3.0" - ["python-magic"]="0.4.27" - ["python-multipart"]="0.0.20" - ["pytz"]="2025.2" - ["pyyaml"]="6.0.2" - ["rich"]="13.9.4" - ["sqids"]="0.5.1" - ["sqlalchemy"]="2.0.41" - ["tiktoken"]="0.9.0" - ["toml"]="0.10.2" - ["uvicorn"]="0.34.0" - ) + # Python 3.9 或 3.10,RPM 包安装过程已处理 pip 依赖 # 对于 Python 3.9,单独安装 MCP 的 wheel 包 local wheel_path="../5-resource/pip/mcp-1.6.0-py3-none-any.whl" if [ -f "$wheel_path" ]; then @@ -579,13 +543,8 @@ check_pip_framework() { echo -e "${COLOR_WARNING}[Warning] Wheel 文件不存在: $wheel_path${COLOR_RESET}" fi else - echo -e "${COLOR_WARNING}[Warning] 不支持的 Python 版本: $python_version,使用默认列表${COLOR_RESET}" - REQUIRED_PACKAGES=( - ["pymongo"]="" - ["requests"]="" - ["pydantic"]="" - ["aiohttp"]="" - ) + echo -e "${COLOR_WARNING}[Warning] 不支持的 Python 版本: $python_version${COLOR_RESET}" + return 1 fi local need_install=0 diff --git a/scripts/deploy/5-resource/mcp-servers.rpmlist b/scripts/deploy/5-resource/mcp-servers.rpmlist deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json b/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json deleted file mode 100644 index ec33562a224a03631b83c10d7547b7380d80504e..0000000000000000000000000000000000000000 --- a/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "perf 分析工具", - "overview": "perf 分析工具", - "description": "perf 分析工具", - "mcpType": "sse", - "author": "root", - "config": { - "env": {}, - "autoApprove": [], - "disabled": false, - "auto_install": false, - "description": "", - "timeout": 60, - "url": "http://127.0.0.1:12141/sse" - } -} \ No newline at end of file diff --git a/scripts/deploy/5-resource/sysTrace.rpmlist b/scripts/deploy/5-resource/sysTrace.rpmlist deleted file mode 100644 index fee84624ac6d5263269bf7fe67ebb094437b8779..0000000000000000000000000000000000000000 --- a/scripts/deploy/5-resource/sysTrace.rpmlist +++ /dev/null @@ -1,2 +0,0 @@ -sysTrace-failslow -sysTrace-mcpserver diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 54e332f07fea2389bfcfa98ac514aafd52046860..de9788d7c06dcae7e01139bd0b44b33c537db46b 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -14,11 +14,13 @@ from __future__ import annotations import asyncio import json +import subprocess from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any import httpx +import toml from config.manager import ConfigManager from log.manager import get_logger @@ -26,7 +28,7 @@ from log.manager import get_logger from .models import AgentInitStatus, DeploymentState if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Awaitable, Callable logger = get_logger(__name__) @@ -73,6 +75,17 @@ class AgentInfo: mcp_services: list[str] +@dataclass +class AppConfig: + """应用配置信息(从 TOML 文件读取)""" + + app_type: str + name: str + description: str + mcp_path: list[str] + published: bool = True + + class McpConfigLoader: """MCP 配置加载器""" @@ -126,7 +139,7 @@ class ApiClient: def __init__(self, server_ip: str, server_port: int) -> None: """初始化 API 客户端""" self.base_url = f"http://{server_ip}:{server_port}" - self.timeout = 60.0 # httpx 使用浮点数作为超时 + self.timeout = 10.0 async def register_mcp_service(self, config: McpConfig) -> str: """注册 MCP 服务""" @@ -357,9 +370,9 @@ class AgentManager: self.config_manager = ConfigManager() resource_paths = [ - Path("/usr/lib/openeuler-intelligence/scripts/5-resource"), # 生产环境 - Path("scripts/deploy/5-resource"), # 开发环境(相对路径) - Path(__file__).parent.parent.parent / "scripts/deploy/5-resource", # 开发环境(绝对路径) + Path("/usr/lib/euler-copilot-framework/mcp_center"), # 生产环境 + Path("scripts/deploy/5-resource"), # 旧的开发环境(兼容) + Path(__file__).parent.parent.parent / "scripts/deploy/5-resource", # 旧的开发环境(绝对路径兼容) ] self.resource_dir = next((p for p in resource_paths if p.exists()), None) @@ -369,6 +382,9 @@ class AgentManager: logger.info("[DeploymentHelper] 使用资源路径: %s", self.resource_dir) self.mcp_config_dir = self.resource_dir / "mcp_config" + self.run_script_path = self.resource_dir / "run.sh" + self.service_dir = self.resource_dir / "service" + self.app_config_path = self.mcp_config_dir / "mcp_to_app_config.toml" async def initialize_agents( self, @@ -385,55 +401,58 @@ class AgentManager: self._report_progress(state, "[bold blue]开始初始化智能体...[/bold blue]", progress_callback) try: - # 预处理:检查必要的 RPM 包可用性 - rpm_availability_result = await self._check_prerequisite_packages_availability(state, progress_callback) - if rpm_availability_result == AgentInitStatus.SKIPPED: - return AgentInitStatus.SKIPPED - - # 安装必要的 RPM 包 - if not await self._install_prerequisite_packages(state, progress_callback): - return AgentInitStatus.FAILED - - # 加载配置 - configs = await self._load_mcp_configs(state, progress_callback) - if not configs: - return AgentInitStatus.FAILED - - # 处理 MCP 服务 - os_service_ids, systrace_service_ids = await self._process_all_mcp_services( - configs, - state, - progress_callback, - ) + # 执行所有初始化步骤 + return await self._execute_initialization_steps(state, progress_callback) - if not os_service_ids and not systrace_service_ids: - self._report_progress(state, "[red]所有 MCP 服务处理失败[/red]", progress_callback) - return AgentInitStatus.FAILED + except Exception: + error_msg = "智能体初始化失败" + self._report_progress(state, f"[red]{error_msg}[/red]", progress_callback) + logger.exception(error_msg) + return AgentInitStatus.FAILED - # 创建智能体 - default_app_id = await self._create_multiple_agents( - os_service_ids, - systrace_service_ids, - state, - progress_callback, - ) + async def _execute_initialization_steps( + self, + state: DeploymentState, + progress_callback: Callable[[DeploymentState], None] | None, + ) -> AgentInitStatus: + """执行所有初始化步骤""" + # 1. 安装 systemd 服务文件 + if not await self._install_service_files(state, progress_callback): + return AgentInitStatus.FAILED + + # 2. 运行脚本拉起 MCP Server 进程 + if not await self._start_mcp_servers(state, progress_callback): + return AgentInitStatus.FAILED + # 3. 验证 MCP Server 服务状态 + if not await self._verify_mcp_services(state, progress_callback): + return AgentInitStatus.FAILED + + # 4. 加载 MCP 配置并注册服务 + mcp_service_mapping = await self._register_all_mcp_services(state, progress_callback) + if not mcp_service_mapping: + return AgentInitStatus.FAILED + + # 5. 读取应用配置并创建智能体 + default_app_id = await self._create_agents_from_config( + mcp_service_mapping, + state, + progress_callback, + ) + + if default_app_id: self._report_progress( state, f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", progress_callback, ) logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) - - except Exception: - error_msg = "智能体初始化失败" - self._report_progress(state, f"[red]{error_msg}[/red]", progress_callback) - logger.exception(error_msg) - return AgentInitStatus.FAILED - - else: return AgentInitStatus.SUCCESS + # 如果没有创建任何智能体,显示警告并返回成功状态 + self._report_progress(state, "[yellow]未能创建任何智能体[/yellow]", progress_callback) + return AgentInitStatus.SUCCESS + def _report_progress( self, state: DeploymentState, @@ -445,563 +464,858 @@ class AgentManager: if callback: callback(state) - async def _load_mcp_configs( - self, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> list[tuple[Path, McpConfig]]: - """加载 MCP 配置""" - self._report_progress(state, "[cyan]加载 MCP 配置文件...[/cyan]", callback) - - config_loader = McpConfigLoader(self.mcp_config_dir) - configs = config_loader.load_all_configs() - - if not configs: - self._report_progress(state, "[yellow]未找到 MCP 配置文件[/yellow]", callback) - return [] - - self._report_progress(state, f"[green]成功加载 {len(configs)} 个 MCP 配置[/green]", callback) - return configs - - async def _process_all_mcp_services( + def _get_service_files( self, - configs: list[tuple[Path, McpConfig]], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> tuple[list[str], list[str]]: + operation_name: str, + ) -> list[Path] | None: """ - 处理所有 MCP 服务 + 获取服务文件列表的通用方法 Returns: - tuple[list[str], list[str]]: (os_service_ids, systrace_service_ids) + list[Path]: 服务文件列表,如果应该跳过操作则返回 None """ - os_service_ids = [] - systrace_service_ids = [] - - for config_path, config in configs: - self._report_progress(state, f"[magenta]处理 MCP 服务: {config.name}[/magenta]", callback) - - service_id = await self._process_mcp_service(config, state, callback) - if service_id: - # 根据配置路径判断是否为 sysTrace 相关服务 - if "systrace" in config_path.parent.name.lower() or "systrace" in config.name.lower(): - systrace_service_ids.append(service_id) - else: - os_service_ids.append(service_id) - else: - self._report_progress(state, f"[red]MCP 服务 {config.name} 处理失败[/red]", callback) - - return os_service_ids, systrace_service_ids - - async def _create_multiple_agents( - self, - os_service_ids: list[str], - systrace_service_ids: list[str], - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str: - """创建多个智能体,返回默认智能体 ID""" - default_app_id = None - - # 创建智能运维 Agent(如果有相应的服务) - if os_service_ids: + if not self.service_dir or not self.service_dir.exists(): self._report_progress( state, - f"[bold cyan]创建智能运维 Agent (包含 {len(os_service_ids)} 个 MCP 服务)[/bold cyan]", + f"[yellow]服务配置目录不存在: {self.service_dir},跳过{operation_name}[/yellow]", callback, ) + logger.warning("服务配置目录不存在: %s", self.service_dir) + return None - os_app_id = await self.api_client.create_agent( - "智能运维", - "openEuler 智能运维助手", - os_service_ids, - ) - await self.api_client.publish_agent(os_app_id) - - self._report_progress(state, "[green]智能运维 Agent 创建成功[/green]", callback) - default_app_id = os_app_id # 智能运维作为默认 Agent - - # 创建慢卡检测智能助手(如果有相应的服务) - if systrace_service_ids: + # 获取所有 .service 文件 + service_files = list(self.service_dir.glob("*.service")) + if not service_files: self._report_progress( state, - f"[bold magenta]创建慢卡检测智能助手 (包含 {len(systrace_service_ids)} 个 MCP 服务)[/bold magenta]", + f"[yellow]未找到服务配置文件,跳过{operation_name}[/yellow]", callback, ) + return None - systrace_app_id = await self.api_client.create_agent( - "慢卡检测智能助手", - "检测集群中的慢卡问题", - systrace_service_ids, - ) - await self.api_client.publish_agent(systrace_app_id) - - self._report_progress(state, "[green]慢卡检测智能助手创建成功[/green]", callback) - - if default_app_id: - self._report_progress(state, "[dim]保存默认智能体配置...[/dim]", callback) - self.config_manager.set_default_app(default_app_id) - - return default_app_id or "" - - async def _register_mcp_service( - self, - config: McpConfig, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str: - """注册 MCP 服务""" - self._report_progress(state, f" [blue]注册 {config.name}...[/blue]", callback) - return await self.api_client.register_mcp_service(config) + return service_files - async def _install_and_wait_mcp_service( + async def _process_service_files( self, - service_id: str, - config_name: str, + service_files: list[Path], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """安装并等待 MCP 服务完成""" - self._report_progress(state, f" [cyan]安装 {config_name} (ID: {service_id})...[/cyan]", callback) - await self.api_client.install_mcp_service(service_id) + processor_func: Callable[ + [Path, DeploymentState, Callable[[DeploymentState], None] | None], + Awaitable[tuple[bool, str]], + ], + ) -> tuple[bool, list[str], list[str]]: + """ + 处理服务文件的通用框架 - self._report_progress(state, f" [dim]等待 {config_name} 安装完成...[/dim]", callback) - if not await self.api_client.wait_for_installation(service_id): - self._report_progress(state, f" [red]{config_name} 安装超时[/red]", callback) - return False + Args: + service_files: 要处理的服务文件列表 + state: 部署状态 + callback: 进度回调函数 + processor_func: 处理单个文件的函数,返回 (成功标志, 文件名) - return True + Returns: + tuple[bool, list[str], list[str]]: (总体是否成功, 成功的文件列表, 失败的文件列表) - async def _activate_mcp_service( - self, - service_id: str, - config_name: str, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> None: - """激活 MCP 服务""" - self._report_progress(state, f" [yellow]激活 {config_name}...[/yellow]", callback) - await self.api_client.activate_mcp_service(service_id) - self._report_progress(state, f" [green]{config_name} 处理完成[/green]", callback) + """ + success_files = [] + failed_files = [] - async def _process_mcp_service( - self, - config: McpConfig, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str | None: - """处理单个 MCP 服务""" - # 如果是 SSE 类型,先验证 URL可用且为SSE - if config.mcp_type == "sse": - valid = await self._validate_sse_endpoint(config, state, callback) - if not valid: + for service_file in service_files: + try: + success, file_identifier = await processor_func(service_file, state, callback) + if success: + success_files.append(file_identifier) + else: + failed_files.append(file_identifier) + except Exception: + file_identifier = service_file.stem self._report_progress( state, - f" [red]MCP 服务 {config.name} SSE Endpoint 验证失败[/red]", + f" [red]处理 {file_identifier} 时发生异常[/red]", callback, ) - return None - try: - # 注册服务 - service_id = await self._register_mcp_service(config, state, callback) - - # 安装并等待完成 - if not await self._install_and_wait_mcp_service(service_id, config.name, state, callback): - return None - - # 激活服务 - await self._activate_mcp_service(service_id, config.name, state, callback) - - except (ApiError, httpx.RequestError, Exception) as e: - self._report_progress(state, f" [red]{config.name} 处理失败: {e}[/red]", callback) - logger.exception("MCP 服务 %s 处理失败", config.name) - return None + logger.exception("处理服务文件时发生异常: %s", service_file) + failed_files.append(file_identifier) - else: - return service_id + return len(failed_files) == 0, success_files, failed_files - async def _validate_sse_endpoint( + async def _install_service_files( self, - config: McpConfig, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, ) -> bool: - """验证 SSE Endpoint 是否可用""" - url = config.config.get("url") or "" - self._report_progress( + """安装 systemd 服务文件""" + self._report_progress(state, "[cyan]安装 systemd 服务文件...[/cyan]", callback) + + # 获取服务文件列表 + service_files = self._get_service_files(state, callback, "服务文件安装") + if service_files is None: + return True + + # 处理所有服务文件 + overall_success, installed_files, failed_files = await self._process_service_files( + service_files, state, - f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", callback, + self._install_single_service_file, ) - try: - async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: - response = await client.get( - url, - headers={"Accept": "text/event-stream"}, - ) - if response.status_code != HTTP_OK: - self._report_progress( - state, - f" [red]{config.name} URL 响应码非 200: {response.status_code}[/red]", - callback, - ) - return False - content_type = response.headers.get("content-type", "") - if "text/event-stream" not in content_type: - self._report_progress( - state, - f" [red]{config.name} Content-Type 非 SSE: {content_type}[/red]", - callback, - ) - return False - self._report_progress(state, f" [green]{config.name} SSE Endpoint 验证通过[/green]", callback) - return True - except Exception as e: - self._report_progress(state, f" [red]{config.name} SSE 验证失败:[/red] {e}", callback) - logger.exception("验证 SSE Endpoint 失败: %s", url) - return False - async def _install_prerequisite_packages( + # 如果有成功安装的文件,重新加载 systemd 配置 + if installed_files: + if not await self._reload_systemd_daemon(state, callback): + return False + + self._report_progress( + state, + f"[green]成功安装 {len(installed_files)} 个服务文件[/green]", + callback, + ) + + return True + + async def _install_single_service_file( self, + service_file: Path, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """安装必要的 RPM 包(已知包可用的情况下)""" - try: - # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) - systrace_exists = self._check_systrace_config(state, callback) + ) -> tuple[bool, str]: + """安装单个服务文件""" + service_name = service_file.name + systemd_dir = Path("/etc/systemd/system") + target_path = systemd_dir / service_name - # 安装包(此时已知包是可用的) - if systrace_exists: - # 安装 sysTrace.rpmlist 中的包 - if not await self._install_rpm_packages("sysTrace.rpmlist", state, callback): - return False + self._report_progress( + state, + f" [blue]复制服务文件: {service_name}[/blue]", + callback, + ) - # 设置 systrace-mcpserver 服务开机启动并立即启动 - if not await self._setup_systrace_service(state, callback): - return False + try: + # 复制服务文件到 systemd 目录 + cmd = f"sudo cp {service_file} {target_path}" + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) - # 安装 mcp-servers.rpmlist 中的包 - return await self._install_rpm_packages("mcp-servers.rpmlist", state, callback) + stdout, stderr = await process.communicate() except Exception: - error_msg = "安装必要包失败" - self._report_progress(state, f"[red]{error_msg}[/red]", callback) - logger.exception(error_msg) - return False - - async def _check_rpm_packages_availability( - self, - rpm_list_files: list[str], - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """检查 RPM 包是否在 yum 源中可用""" - self._report_progress(state, "[cyan]检查 RPM 包在 yum 源中的可用性...[/cyan]", callback) - - if not self.resource_dir: self._report_progress( state, - "[red]资源目录未找到,无法检查 RPM 包可用性[/red]", + f" [red]复制 {service_name} 时发生异常[/red]", callback, ) - logger.error("资源目录未找到,无法检查 RPM 包可用性") - return False - - all_packages = [] - - # 收集所有需要检查的包 - for rpm_list_file in rpm_list_files: - rpm_list_path = self.resource_dir / rpm_list_file - - if not rpm_list_path.exists(): - self._report_progress( - state, - f"[yellow]RPM 列表文件不存在: {rpm_list_file},跳过检查[/yellow]", - callback, - ) - logger.warning("RPM 列表文件不存在: %s", rpm_list_path) - continue - - try: - with rpm_list_path.open(encoding="utf-8") as f: - packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] - all_packages.extend(packages) - except Exception as e: + logger.exception("复制服务文件时发生异常: %s", service_file) + return False, service_name + else: + if process.returncode == 0: self._report_progress( state, - f"[red]读取 RPM 列表文件失败:[/red] {rpm_list_file} - {e}", + f" [green]{service_name} 复制成功[/green]", callback, ) - logger.exception("读取 RPM 列表文件失败: %s", rpm_list_path) - return False - - if not all_packages: - self._report_progress(state, "[dim]没有要检查的 RPM 包[/dim]", callback) - return True + logger.info("服务文件复制成功: %s -> %s", service_file, target_path) + return True, service_name - # 检查每个包的可用性 - unavailable_packages = [] - - for package in all_packages: - # 使用 dnf list available 检查包是否可用 - check_cmd = f"dnf list available {package}" + error_output = stderr.decode("utf-8") if stderr else "" + self._report_progress( + state, + f" [red]{service_name} 复制失败: {error_output}[/red]", + callback, + ) + logger.error("服务文件复制失败: %s, 错误: %s", service_file, error_output) + return False, service_name - try: - process = await asyncio.create_subprocess_shell( - check_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + async def _reload_systemd_daemon( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """重新加载 systemd 配置""" + self._report_progress(state, "[cyan]重新加载 systemd 配置...[/cyan]", callback) - stdout, stderr = await process.communicate() + try: + cmd = "sudo systemctl daemon-reload" + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) - if process.returncode != 0: - unavailable_packages.append(package) - logger.warning("RPM 包不可用: %s", package) + stdout, stderr = await process.communicate() - except Exception as e: + except Exception: + self._report_progress( + state, + "[red]重新加载 systemd 配置时发生异常[/red]", + callback, + ) + logger.exception("重新加载 systemd 配置时发生异常") + return False + else: + if process.returncode == 0: self._report_progress( state, - f" [red]检查包 {package} 失败:[/red] {e}", + "[green]systemd 配置重新加载成功[/green]", callback, ) - logger.exception("检查 RPM 包可用性失败: %s", package) - unavailable_packages.append(package) + logger.info("systemd 配置重新加载成功") + return True - # 如果有不可用的包,返回 False - if unavailable_packages: + error_output = stderr.decode("utf-8") if stderr else "" self._report_progress( state, - f"[dim]以下 RPM 包不可用,跳过智能体初始化: {', '.join(unavailable_packages)}[/dim]", + f"[red]systemd 配置重新加载失败: {error_output}[/red]", callback, ) - logger.error("发现不可用的 RPM 包,跳过智能体初始化: %s", unavailable_packages) + logger.error("systemd 配置重新加载失败: %s", error_output) return False - self._report_progress( - state, - "[green]所有 RPM 包均可用,继续智能体初始化[/green]", - callback, - ) - logger.info("所有 RPM 包均可用") - return True - - async def _check_prerequisite_packages_availability( + async def _start_mcp_servers( self, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> AgentInitStatus: - """ - 检查必要的 RPM 包是否在 yum 源中可用 - - Returns: - AgentInitStatus: SUCCESS 表示所有包可用,SKIPPED 表示有包不可用应跳过 - - """ - try: - # 准备要检查的 RPM 列表文件 - rpm_files_to_check = ["mcp-servers.rpmlist"] + ) -> bool: + """运行脚本拉起 MCP Server 进程""" + self._report_progress(state, "[cyan]启动 MCP Server 进程...[/cyan]", callback) - # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) - systrace_exists = self._check_systrace_config(state, callback) - if systrace_exists: - rpm_files_to_check.append("sysTrace.rpmlist") + if not self.run_script_path or not self.run_script_path.exists(): + self._report_progress( + state, + f"[red]MCP 启动脚本不存在: {self.run_script_path}[/red]", + callback, + ) + logger.error("MCP 启动脚本不存在: %s", self.run_script_path) + return False - # 检查包可用性 - packages_available = await self._check_rpm_packages_availability(rpm_files_to_check, state, callback) + # 1. 先检查并清理可能存在的旧进程 + if not await self._cleanup_old_mcp_processes(state, callback): + # 清理失败不会阻止继续执行,只是记录警告 + self._report_progress( + state, + "[yellow]清理旧进程时遇到问题,但继续执行启动脚本[/yellow]", + callback, + ) + + # 2. 执行启动脚本 + try: + # 执行 run.sh 脚本 + cmd = f"bash {self.run_script_path}" + self._report_progress(state, f" [blue]执行命令: {cmd}[/blue]", callback) + logger.info("执行 MCP 启动脚本: %s", cmd) - if not packages_available: + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + stdout, _ = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" + + if process.returncode == 0: self._report_progress( state, - "[yellow]MCP Server 相关 RPM 包可用性检查失败,跳过智能体初始化,其他部署步骤将继续进行[/yellow]", + "[green]MCP Server 启动脚本执行成功[/green]", callback, ) - return AgentInitStatus.SKIPPED + logger.info("MCP Server 启动脚本执行成功") + return True except Exception: - error_msg = "检查 RPM 包可用性失败" + error_msg = "执行 MCP Server 启动脚本失败" self._report_progress(state, f"[red]{error_msg}[/red]", callback) logger.exception(error_msg) - return AgentInitStatus.SKIPPED # 检查失败也视为跳过,而不是整个部署失败 - + return False else: - return AgentInitStatus.SUCCESS + self._report_progress( + state, + f"[red]MCP Server 启动脚本执行失败 (返回码: {process.returncode})[/red]", + callback, + ) + logger.error("MCP Server 启动脚本执行失败: %s, 输出: %s", cmd, output) + return False - def _check_systrace_config( + async def _cleanup_old_mcp_processes( self, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, ) -> bool: - """检查是否存在以 systrace 开头的配置目录""" - self._report_progress(state, "[cyan]检查 sysTrace 配置...[/cyan]", callback) + """检查并清理可能存在的旧 MCP 进程""" + # 静默获取服务文件列表 + if not self.service_dir or not self.service_dir.exists(): + return True - if not self.resource_dir or not self.mcp_config_dir: - self._report_progress(state, "[yellow]资源目录或 MCP 配置目录不存在[/yellow]", callback) - return False + service_files = list(self.service_dir.glob("*.service")) + if not service_files: + return True - if not self.mcp_config_dir.exists(): - self._report_progress(state, "[yellow]MCP 配置目录不存在[/yellow]", callback) - return False + # 静默清理服务 + for service_file in service_files: + service_name = service_file.stem # 去掉 .service 后缀 + await self._stop_service(service_name) - for subdir in self.mcp_config_dir.iterdir(): - if subdir.is_dir() and subdir.name.lower().startswith("systrace"): - self._report_progress(state, f"[green]发现 sysTrace 配置: {subdir.name}[/green]", callback) - logger.info("发现 sysTrace 配置目录: %s", subdir.name) - return True + return True - self._report_progress(state, "[dim]未发现 sysTrace 配置[/dim]", callback) - return False + async def _stop_service(self, service_name: str) -> None: + """静默停止服务""" + try: + # 检查服务状态 + status_cmd = f"systemctl is-active {service_name}" + status_process = await asyncio.create_subprocess_shell( + status_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await status_process.communicate() + status = stdout.decode("utf-8").strip() if stdout else "" + + # 如果服务正在运行,静默停止它 + if status == "active": + stop_cmd = f"sudo systemctl stop {service_name}" + await asyncio.create_subprocess_shell( + stop_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except (OSError, subprocess.SubprocessError): + # 静默忽略任何错误 + logger.debug("静默停止服务时发生异常: %s", service_name) - async def _install_rpm_packages( + async def _verify_mcp_services( self, - rpm_list_file: str, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, ) -> bool: - """安装指定 RPM 列表文件中的包""" - if not self.resource_dir: + """验证 MCP Server 服务状态""" + self._report_progress(state, "[cyan]验证 MCP Server 服务状态...[/cyan]", callback) + + # 获取服务文件列表 + service_files = self._get_service_files(state, callback, "服务验证") + if service_files is None: + return True + + # 处理所有服务文件 + overall_success, active_services, failed_services = await self._process_service_files( + service_files, + state, + callback, + self._verify_single_service, + ) + + if failed_services: self._report_progress( state, - f"[red]资源目录未找到,无法安装 {rpm_list_file}[/red]", + f"[red]关键服务状态异常: {', '.join(failed_services)},停止初始化[/red]", callback, ) - logger.error("资源目录未找到,无法安装 RPM 包: %s", rpm_list_file) + logger.error("关键服务状态异常,停止初始化: %s", failed_services) return False - rpm_list_path = self.resource_dir / rpm_list_file + self._report_progress(state, "[green]MCP Server 服务验证完成[/green]", callback) + return True - if not rpm_list_path.exists(): + async def _verify_single_service( + self, + service_file: Path, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + retry_count: int = 0, + ) -> tuple[bool, str]: + """验证单个服务状态""" + await asyncio.sleep(2) + + service_name = service_file.stem # 去掉 .service 后缀 + + # 限制递归次数,最多重试6次(30秒) + max_retries = 6 + if retry_count > max_retries: self._report_progress( state, - f"[yellow]RPM 列表文件不存在: {rpm_list_file}[/yellow]", + f" [red]{service_name} 启动超时 (30秒)[/red]", callback, ) - logger.warning("RPM 列表文件不存在: %s", rpm_list_path) - return True # 文件不存在不算失败,继续执行 - - self._report_progress(state, f"[cyan]安装 {rpm_list_file} 中的 RPM 包...[/cyan]", callback) - - try: - # 读取 RPM 包列表 - with rpm_list_path.open(encoding="utf-8") as f: - packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] - - if not packages: - self._report_progress(state, f"[dim]{rpm_list_file} 中没有要安装的包[/dim]", callback) - return True - - # 使用 dnf 安装包 - package_list = " ".join(packages) - install_cmd = f"sudo dnf install -y {package_list}" + logger.error("服务启动超时: %s", service_name) + return False, service_name + if retry_count == 0: + self._report_progress( + state, + f" [magenta]检查服务状态: {service_name}[/magenta]", + callback, + ) + else: self._report_progress( state, - f" [blue]执行安装命令: {install_cmd}[/blue]", + f" [dim]{service_name} 重新检查状态... (第 {retry_count} 次)[/dim]", callback, ) - logger.info("执行 RPM 包安装命令: %s", install_cmd) - # 使用 asyncio.create_subprocess_shell 执行命令 + try: + # 使用 systemctl status 获取详细状态信息 + cmd = f"systemctl status {service_name}" process = await asyncio.create_subprocess_shell( - install_cmd, + cmd, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, + stderr=asyncio.subprocess.PIPE, ) - stdout, _ = await process.communicate() + stdout, stderr = await process.communicate() output = stdout.decode("utf-8") if stdout else "" - if process.returncode == 0: + except Exception: + self._report_progress( + state, + f" [red]检查 {service_name} 状态失败[/red]", + callback, + ) + logger.exception("检查服务状态失败: %s", service_name) + return False, service_name + else: + # systemctl status 返回码: 0=active, 1=dead, 2=unknown, 3=not-found, 4=permission-denied + if process.returncode == 0 and "active (running)" in output.lower(): + # 服务正常运行 self._report_progress( state, - f" [green]{rpm_list_file} 中的包安装成功[/green]", + f" [green]{service_name} 状态正常 (active running)[/green]", callback, ) - logger.info("RPM 包安装成功: %s", package_list) - else: + logger.info("服务状态正常: %s", service_name) + return True, service_name + + # 分析输出内容,检查是否有失败信息 + if "failed" in output.lower() or "code=exited" in output.lower(): self._report_progress( state, - f" [red]{rpm_list_file} 中的包安装失败 (返回码: {process.returncode})[/red]", + f" [red]{service_name} 服务启动失败[/red]", callback, ) - logger.error("RPM 包安装失败: %s, 输出: %s", package_list, output) - return False + logger.error("服务启动失败: %s, 详细信息: %s", service_name, output.strip()) + return False, service_name - except Exception: - error_msg = f"安装 {rpm_list_file} 失败" - self._report_progress(state, f" [red]{error_msg}[/red]", callback) - logger.exception(error_msg) - return False + # 检查是否真的在启动中(activating 状态) + if "activating" in output.lower() and "start" in output.lower(): + if retry_count == 0: + self._report_progress( + state, + f" [yellow]{service_name} 正在启动中,等待启动完成...[/yellow]", + callback, + ) + logger.info("服务正在启动中,等待启动完成: %s", service_name) - return True + # 等待3秒后递归调用自己 + await asyncio.sleep(3) + return await self._verify_single_service(service_file, state, callback, retry_count + 1) - async def _setup_systrace_service( + # 其他状态都认为是异常 + self._report_progress( + state, + f" [red]{service_name} 状态异常 (返回码: {process.returncode})[/red]", + callback, + ) + logger.warning("服务状态异常: %s, 返回码: %d, 输出: %s", service_name, process.returncode, output.strip()) + return False, service_name + + async def _register_all_mcp_services( self, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """设置 systrace-mcpserver 服务""" - service_name = "systrace-mcpserver" - self._report_progress(state, f"[magenta]设置 {service_name} 服务...[/magenta]", callback) + ) -> dict[str, str]: + """ + 注册所有 MCP 服务 - try: - # 启用服务开机启动 - enable_cmd = f"sudo systemctl enable {service_name}" - self._report_progress(state, f" [cyan]设置开机启动: {enable_cmd}[/cyan]", callback) + Returns: + dict[str, str]: MCP 路径名 -> 服务 ID 的映射 - process = await asyncio.create_subprocess_shell( - enable_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) + """ + self._report_progress(state, "[cyan]注册 MCP 服务...[/cyan]", callback) - stdout, _ = await process.communicate() - output = stdout.decode("utf-8") if stdout else "" + # 加载 MCP 配置 + configs = await self._load_mcp_configs(state, callback) + if not configs: + return {} - if process.returncode != 0: + mcp_service_mapping = {} + + for config_path, config in configs: + service_id = await self._process_mcp_service(config, state, callback) + if service_id: + # 使用配置目录名作为 MCP 路径名 + mcp_path_name = config_path.parent.name + mcp_service_mapping[mcp_path_name] = service_id self._report_progress( state, - f" [red]设置 {service_name} 开机启动失败: {output}[/red]", + f" [green]{config.name} 注册成功: {mcp_path_name} -> {service_id}[/green]", + callback, + ) + else: + self._report_progress( + state, + f" [red]MCP 服务 {config.name} 注册失败[/red]", callback, ) - logger.error("设置服务开机启动失败: %s, 输出: %s", service_name, output) - return False - # 启动服务 - start_cmd = f"sudo systemctl start {service_name}" - self._report_progress(state, f" [blue]启动服务: {start_cmd}[/blue]", callback) + self._report_progress( + state, + f"[green]MCP 服务注册完成,成功 {len(mcp_service_mapping)} 个[/green]", + callback, + ) + return mcp_service_mapping - process = await asyncio.create_subprocess_shell( - start_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, + async def _create_agents_from_config( + self, + mcp_service_mapping: dict[str, str], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str | None: + """从配置文件创建智能体""" + self._report_progress(state, "[cyan]读取应用配置并创建智能体...[/cyan]", callback) + + # 读取应用配置 + app_configs = await self._load_app_configs(state, callback) + if not app_configs: + return None + + created_agents = [] + default_app_id = None + + for i, app_config in enumerate(app_configs): + app_id = await self._create_single_agent( + app_config, + mcp_service_mapping, + state, + callback, ) - stdout, _ = await process.communicate() - output = stdout.decode("utf-8") if stdout else "" + if app_id: + created_agents.append(app_id) - if process.returncode == 0: + # 第一个智能体设置为默认智能体 + if i == 0: + default_app_id = app_id + self._report_progress( + state, + f" [dim]设置默认智能体: {app_config.name}[/dim]", + callback, + ) + self.config_manager.set_default_app(app_id) + + if created_agents: + self._report_progress( + state, + f"[green]成功创建 {len(created_agents)} 个智能体[/green]", + callback, + ) + return default_app_id + + self._report_progress(state, "[red]未能创建任何智能体[/red]", callback) + return None + + async def _create_single_agent( + self, + app_config: AppConfig, + mcp_service_mapping: dict[str, str], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str | None: + """创建单个智能体""" + self._report_progress( + state, + f"[magenta]创建智能体: {app_config.name}[/magenta]", + callback, + ) + + # 将 MCP 路径转换为服务 ID + mcp_service_ids, missing_services = self._resolve_mcp_services( + app_config.mcp_path, + mcp_service_mapping, + ) + + if missing_services: + self._report_progress( + state, + f" [yellow]缺少 MCP 服务: {', '.join(missing_services)},跳过[/yellow]", + callback, + ) + logger.warning("智能体 %s 缺少 MCP 服务: %s", app_config.name, missing_services) + return None + + if not mcp_service_ids: + self._report_progress( + state, + f" [yellow]智能体 {app_config.name} 没有可用的 MCP 服务,跳过[/yellow]", + callback, + ) + return None + + try: + # 创建智能体 + app_id = await self.api_client.create_agent( + app_config.name, + app_config.description, + mcp_service_ids, + ) + + # 发布智能体(如果配置要求发布) + if app_config.published: + await self.api_client.publish_agent(app_id) + + except Exception: + self._report_progress( + state, + f" [red]创建智能体 {app_config.name} 失败[/red]", + callback, + ) + logger.exception("创建智能体失败: %s", app_config.name) + return None + else: + self._report_progress( + state, + f" [green]智能体 {app_config.name} 创建成功: {app_id}[/green]", + callback, + ) + return app_id + + def _resolve_mcp_services( + self, + mcp_paths: list[str], + mcp_service_mapping: dict[str, str], + ) -> tuple[list[str], list[str]]: + """解析 MCP 路径为服务 ID""" + mcp_service_ids = [] + missing_services = [] + + for mcp_path in mcp_paths: + if mcp_path in mcp_service_mapping: + mcp_service_ids.append(mcp_service_mapping[mcp_path]) + else: + missing_services.append(mcp_path) + + return mcp_service_ids, missing_services + + async def _load_app_configs( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> list[AppConfig]: + """加载应用配置""" + self._report_progress(state, "[cyan]加载应用配置文件...[/cyan]", callback) + + if not self.app_config_path or not self.app_config_path.exists(): + self._report_progress( + state, + f"[red]应用配置文件不存在: {self.app_config_path}[/red]", + callback, + ) + logger.error("应用配置文件不存在: %s", self.app_config_path) + return [] + + try: + with self.app_config_path.open(encoding="utf-8") as f: + config_data = toml.load(f) + + applications = config_data.get("applications", []) + if not applications: self._report_progress( state, - f" [green]{service_name} 服务启动成功[/green]", + "[yellow]配置文件中没有找到应用定义[/yellow]", callback, ) - logger.info("sysTrace 服务启动成功: %s", service_name) - else: + logger.warning("配置文件中没有找到应用定义") + return [] + + app_configs = [] + for app_data in applications: + try: + app_config = AppConfig( + app_type=app_data.get("appType", "agent"), + name=app_data["name"], + description=app_data["description"], + mcp_path=app_data["mcpPath"], + published=app_data.get("published", True), + ) + app_configs.append(app_config) + except KeyError as e: + self._report_progress( + state, + f" [red]应用配置缺少必需字段: {e}[/red]", + callback, + ) + logger.exception("应用配置缺少必需字段") + continue + + except Exception: + error_msg = f"加载应用配置文件失败: {self.app_config_path}" + self._report_progress(state, f"[red]{error_msg}[/red]", callback) + logger.exception(error_msg) + return [] + else: + self._report_progress( + state, + f"[green]成功加载 {len(app_configs)} 个应用配置[/green]", + callback, + ) + return app_configs + + async def _load_mcp_configs( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> list[tuple[Path, McpConfig]]: + """加载 MCP 配置""" + self._report_progress(state, "[cyan]加载 MCP 配置文件...[/cyan]", callback) + + config_loader = McpConfigLoader(self.mcp_config_dir) + configs = config_loader.load_all_configs() + + if not configs: + self._report_progress(state, "[yellow]未找到 MCP 配置文件[/yellow]", callback) + return [] + + self._report_progress(state, f"[green]成功加载 {len(configs)} 个 MCP 配置[/green]", callback) + return configs + + async def _process_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str | None: + """处理单个 MCP 服务""" + # 如果是 SSE 类型,先验证 URL 可用且为 SSE + if config.mcp_type == "sse": + valid = await self._validate_sse_endpoint(config, state, callback) + if not valid: self._report_progress( state, - f" [red]{service_name} 服务启动失败: {output}[/red]", + f" [red]MCP 服务 {config.name} SSE Endpoint 验证失败[/red]", callback, ) - logger.error("sysTrace 服务启动失败: %s, 输出: %s", service_name, output) - return False + return None + try: + # 注册服务 + service_id = await self._register_mcp_service(config, state, callback) - except Exception: - error_msg = f"设置 {service_name} 服务失败" - self._report_progress(state, f" [red]{error_msg}[/red]", callback) - logger.exception(error_msg) + # 安装并等待完成 + if not await self._install_and_wait_mcp_service(service_id, config.name, state, callback): + return None + + # 激活服务 + await self._activate_mcp_service(service_id, config.name, state, callback) + + except (ApiError, httpx.RequestError, Exception) as e: + self._report_progress(state, f" [red]{config.name} 处理失败: {e}[/red]", callback) + logger.exception("MCP 服务 %s 处理失败", config.name) + return None + + return service_id + + async def _register_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str: + """注册 MCP 服务""" + self._report_progress(state, f" [blue]注册 {config.name}...[/blue]", callback) + return await self.api_client.register_mcp_service(config) + + async def _install_and_wait_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装并等待 MCP 服务完成""" + self._report_progress(state, f" [cyan]安装 {config_name} (ID: {service_id})...[/cyan]", callback) + await self.api_client.install_mcp_service(service_id) + + self._report_progress(state, f" [dim]等待 {config_name} 安装完成...[/dim]", callback) + if not await self.api_client.wait_for_installation(service_id): + self._report_progress(state, f" [red]{config_name} 安装超时[/red]", callback) return False return True + + async def _activate_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> None: + """激活 MCP 服务""" + self._report_progress(state, f" [yellow]激活 {config_name}...[/yellow]", callback) + await self.api_client.activate_mcp_service(service_id) + self._report_progress(state, f" [green]{config_name} 处理完成[/green]", callback) + + async def _validate_sse_endpoint( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """验证 SSE Endpoint 是否可用""" + url = config.config.get("url") or "" + self._report_progress( + state, + f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", + callback, + ) + + # 重试配置 + max_attempts = 6 # 30秒 / 5秒 = 6次 + retry_interval = 5 # 5秒重试间隔 + + for attempt in range(1, max_attempts + 1): + try: + # 使用流式请求,只读取响应头,避免 SSE 连接一直保持开放 + async with ( + httpx.AsyncClient(timeout=self.api_client.timeout) as client, + client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response, + ): + if response.status_code == HTTP_OK: + # 验证成功 + self._report_progress( + state, + f" [green]{config.name} SSE Endpoint 验证通过[/green]", + callback, + ) + logger.info("SSE Endpoint 验证成功: %s (尝试 %d 次)", url, attempt) + return True + + logger.debug( + "SSE Endpoint 响应码非 200: %s, 状态码: %d, 尝试: %d/%d", + url, + response.status_code, + attempt, + max_attempts, + ) + + except (httpx.RequestError, httpx.HTTPStatusError) as e: + logger.debug("SSE Endpoint 连接失败: %s, 错误: %s, 尝试: %d/%d", url, e, attempt, max_attempts) + + # 如果还有重试机会,等待后继续 + if attempt < max_attempts: + await asyncio.sleep(retry_interval) + + # 所有尝试都失败了 + self._report_progress( + state, + f" [red]{config.name} SSE Endpoint 验证失败: 3分钟内无法连接[/red]", + callback, + ) + logger.error( + "SSE Endpoint 验证最终失败: %s (尝试了 %d 次,耗时 %d 秒)", + url, + max_attempts, + max_attempts * retry_interval, + ) + return False