From 38b322967fcfc2bb1ecd75dc32878d2c8f71c751 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 5 Sep 2025 10:40:19 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat(deploy):=20=E6=9B=B4=E6=96=B0=20openeu?= =?UTF-8?q?ler-intelligence-installer=20=E5=AD=90=E5=8C=85=E7=9A=84?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E9=A1=B9=EF=BC=8C=E6=B7=BB=E5=8A=A0=20python?= =?UTF-8?q?3-requests=20=E5=92=8C=20python3-aiohttp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- distribution/linux/euler-copilot-shell.spec | 2 + .../install_openEulerIntelligence.sh | 49 ++----------------- 2 files changed, 6 insertions(+), 45 deletions(-) diff --git a/distribution/linux/euler-copilot-shell.spec b/distribution/linux/euler-copilot-shell.spec index 086e82d..5a2a83e 100644 --- a/distribution/linux/euler-copilot-shell.spec +++ b/distribution/linux/euler-copilot-shell.spec @@ -34,6 +34,8 @@ openEuler Intelligence 智能 Shell 是一个智能命令行程序。 # 部署安装工具子包 %package -n openeuler-intelligence-installer Summary: openEuler Intelligence 部署安装脚本 +Requires: python3-aiohttp +Requires: python3-requests BuildArch: noarch %description -n openeuler-intelligence-installer diff --git a/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh b/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh index 761f4e0..e08ded0 100644 --- a/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh +++ b/scripts/deploy/2-install-dependency/install_openEulerIntelligence.sh @@ -526,49 +526,13 @@ check_pip_framework() { # 根据 Python 版本选择包列表 declare -A REQUIRED_PACKAGES if [[ "$python_version" =~ ^3\.(11|[2-9][0-9])$ ]]; then - # Python 3.11 或更新版本,使用当前列表 + # Python 3.11 或更新版本,检查并安装 DNF 源中缺失的 pip 依赖 REQUIRED_PACKAGES=( ["pymongo"]="" - ["requests"]="" ["pydantic"]="" - ["aiohttp"]="" ) elif [[ "$python_version" =~ ^3\.(9|10)$ ]]; then - # Python 3.9 或 3.10,使用完整列表 - REQUIRED_PACKAGES=( - ["requests"]="" - ["aiohttp"]="" - ["aiofiles"]="24.1.0" - ["asyncer"]="0.0.8" - ["asyncpg"]="0.30.0" - ["cryptography"]="44.0.2" - ["fastapi"]="0.115.12" - ["httpx"]="0.28.1" - ["httpx-sse"]="0.4.0" - ["jinja2"]="3.1.6" - ["jionlp"]="1.5.20" - ["jsonschema"]="4.23.0" - ["lancedb"]="0.21.2" - ["minio"]="7.2.15" - ["ollama"]="0.5.1" - ["openai"]="1.91.0" - ["pandas"]="2.2.3" - ["pgvector"]="0.4.1" - ["pillow"]="10.3.0" - ["pydantic"]="2.11.7" - ["pymongo"]="4.12.1" - ["python-jsonpath"]="1.3.0" - ["python-magic"]="0.4.27" - ["python-multipart"]="0.0.20" - ["pytz"]="2025.2" - ["pyyaml"]="6.0.2" - ["rich"]="13.9.4" - ["sqids"]="0.5.1" - ["sqlalchemy"]="2.0.41" - ["tiktoken"]="0.9.0" - ["toml"]="0.10.2" - ["uvicorn"]="0.34.0" - ) + # Python 3.9 或 3.10,RPM 包安装过程已处理 pip 依赖 # 对于 Python 3.9,单独安装 MCP 的 wheel 包 local wheel_path="../5-resource/pip/mcp-1.6.0-py3-none-any.whl" if [ -f "$wheel_path" ]; then @@ -579,13 +543,8 @@ check_pip_framework() { echo -e "${COLOR_WARNING}[Warning] Wheel 文件不存在: $wheel_path${COLOR_RESET}" fi else - echo -e "${COLOR_WARNING}[Warning] 不支持的 Python 版本: $python_version,使用默认列表${COLOR_RESET}" - REQUIRED_PACKAGES=( - ["pymongo"]="" - ["requests"]="" - ["pydantic"]="" - ["aiohttp"]="" - ) + echo -e "${COLOR_WARNING}[Warning] 不支持的 Python 版本: $python_version${COLOR_RESET}" + return 1 fi local need_install=0 -- Gitee From 65c38fe98f8ae79a39128fc5f18709dc01b071f8 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 5 Sep 2025 16:12:46 +0800 Subject: [PATCH 2/7] =?UTF-8?q?feat(deploy-agent):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E6=99=BA=E8=83=BD=E4=BD=93=E7=AE=A1=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E5=BA=94=E7=94=A8=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=B9=B6=E4=BC=98=E5=8C=96=20MCP=20=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E6=B3=A8=E5=86=8C=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- scripts/deploy/5-resource/mcp-servers.rpmlist | 0 .../mcp_config/perf_mcp/config.json | 16 - scripts/deploy/5-resource/sysTrace.rpmlist | 2 - src/app/deployment/agent.py | 879 +++++++++--------- 4 files changed, 419 insertions(+), 478 deletions(-) delete mode 100644 scripts/deploy/5-resource/mcp-servers.rpmlist delete mode 100644 scripts/deploy/5-resource/mcp_config/perf_mcp/config.json delete mode 100644 scripts/deploy/5-resource/sysTrace.rpmlist diff --git a/scripts/deploy/5-resource/mcp-servers.rpmlist b/scripts/deploy/5-resource/mcp-servers.rpmlist deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json b/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json deleted file mode 100644 index ec33562..0000000 --- a/scripts/deploy/5-resource/mcp_config/perf_mcp/config.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "perf 分析工具", - "overview": "perf 分析工具", - "description": "perf 分析工具", - "mcpType": "sse", - "author": "root", - "config": { - "env": {}, - "autoApprove": [], - "disabled": false, - "auto_install": false, - "description": "", - "timeout": 60, - "url": "http://127.0.0.1:12141/sse" - } -} \ No newline at end of file diff --git a/scripts/deploy/5-resource/sysTrace.rpmlist b/scripts/deploy/5-resource/sysTrace.rpmlist deleted file mode 100644 index fee8462..0000000 --- a/scripts/deploy/5-resource/sysTrace.rpmlist +++ /dev/null @@ -1,2 +0,0 @@ -sysTrace-failslow -sysTrace-mcpserver diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 54e332f..6fca86a 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -19,6 +19,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any import httpx +import toml from config.manager import ConfigManager from log.manager import get_logger @@ -73,6 +74,17 @@ class AgentInfo: mcp_services: list[str] +@dataclass +class AppConfig: + """应用配置信息(从 TOML 文件读取)""" + + app_type: str + name: str + description: str + mcp_path: list[str] + published: bool = True + + class McpConfigLoader: """MCP 配置加载器""" @@ -357,9 +369,9 @@ class AgentManager: self.config_manager = ConfigManager() resource_paths = [ - Path("/usr/lib/openeuler-intelligence/scripts/5-resource"), # 生产环境 - Path("scripts/deploy/5-resource"), # 开发环境(相对路径) - Path(__file__).parent.parent.parent / "scripts/deploy/5-resource", # 开发环境(绝对路径) + Path("/usr/lib/euler-copilot-framework/mcp_center"), # 生产环境 + Path("scripts/deploy/5-resource"), # 旧的开发环境(兼容) + Path(__file__).parent.parent.parent / "scripts/deploy/5-resource", # 旧的开发环境(绝对路径兼容) ] self.resource_dir = next((p for p in resource_paths if p.exists()), None) @@ -369,6 +381,9 @@ class AgentManager: logger.info("[DeploymentHelper] 使用资源路径: %s", self.resource_dir) self.mcp_config_dir = self.resource_dir / "mcp_config" + self.run_script_path = self.resource_dir / "run.sh" + self.service_dir = self.resource_dir / "service" + self.app_config_path = self.mcp_config_dir / "mcp_to_app_config.toml" async def initialize_agents( self, @@ -385,53 +400,43 @@ class AgentManager: self._report_progress(state, "[bold blue]开始初始化智能体...[/bold blue]", progress_callback) try: - # 预处理:检查必要的 RPM 包可用性 - rpm_availability_result = await self._check_prerequisite_packages_availability(state, progress_callback) - if rpm_availability_result == AgentInitStatus.SKIPPED: - return AgentInitStatus.SKIPPED - - # 安装必要的 RPM 包 - if not await self._install_prerequisite_packages(state, progress_callback): + # 1. 运行脚本拉起 MCP Server 进程 + if not await self._start_mcp_servers(state, progress_callback): return AgentInitStatus.FAILED - # 加载配置 - configs = await self._load_mcp_configs(state, progress_callback) - if not configs: + # 2. 验证 MCP Server 服务状态 + if not await self._verify_mcp_services(state, progress_callback): return AgentInitStatus.FAILED - # 处理 MCP 服务 - os_service_ids, systrace_service_ids = await self._process_all_mcp_services( - configs, - state, - progress_callback, - ) - - if not os_service_ids and not systrace_service_ids: - self._report_progress(state, "[red]所有 MCP 服务处理失败[/red]", progress_callback) + # 3. 加载 MCP 配置并注册服务 + mcp_service_mapping = await self._register_all_mcp_services(state, progress_callback) + if not mcp_service_mapping: return AgentInitStatus.FAILED - # 创建智能体 - default_app_id = await self._create_multiple_agents( - os_service_ids, - systrace_service_ids, + # 4. 读取应用配置并创建智能体 + default_app_id = await self._create_agents_from_config( + mcp_service_mapping, state, progress_callback, ) - self._report_progress( - state, - f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", - progress_callback, - ) - logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) + if default_app_id: + self._report_progress( + state, + f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", + progress_callback, + ) + logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) + return AgentInitStatus.SUCCESS except Exception: error_msg = "智能体初始化失败" self._report_progress(state, f"[red]{error_msg}[/red]", progress_callback) logger.exception(error_msg) return AgentInitStatus.FAILED - else: + # 如果没有创建任何智能体,显示警告并返回成功状态 + self._report_progress(state, "[yellow]未能创建任何智能体[/yellow]", progress_callback) return AgentInitStatus.SUCCESS def _report_progress( @@ -445,563 +450,517 @@ class AgentManager: if callback: callback(state) - async def _load_mcp_configs( - self, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> list[tuple[Path, McpConfig]]: - """加载 MCP 配置""" - self._report_progress(state, "[cyan]加载 MCP 配置文件...[/cyan]", callback) - - config_loader = McpConfigLoader(self.mcp_config_dir) - configs = config_loader.load_all_configs() - - if not configs: - self._report_progress(state, "[yellow]未找到 MCP 配置文件[/yellow]", callback) - return [] - - self._report_progress(state, f"[green]成功加载 {len(configs)} 个 MCP 配置[/green]", callback) - return configs - - async def _process_all_mcp_services( + async def _start_mcp_servers( self, - configs: list[tuple[Path, McpConfig]], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> tuple[list[str], list[str]]: - """ - 处理所有 MCP 服务 - - Returns: - tuple[list[str], list[str]]: (os_service_ids, systrace_service_ids) - - """ - os_service_ids = [] - systrace_service_ids = [] - - for config_path, config in configs: - self._report_progress(state, f"[magenta]处理 MCP 服务: {config.name}[/magenta]", callback) - - service_id = await self._process_mcp_service(config, state, callback) - if service_id: - # 根据配置路径判断是否为 sysTrace 相关服务 - if "systrace" in config_path.parent.name.lower() or "systrace" in config.name.lower(): - systrace_service_ids.append(service_id) - else: - os_service_ids.append(service_id) - else: - self._report_progress(state, f"[red]MCP 服务 {config.name} 处理失败[/red]", callback) - - return os_service_ids, systrace_service_ids - - async def _create_multiple_agents( - self, - os_service_ids: list[str], - systrace_service_ids: list[str], - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str: - """创建多个智能体,返回默认智能体 ID""" - default_app_id = None + ) -> bool: + """运行脚本拉起 MCP Server 进程""" + self._report_progress(state, "[cyan]启动 MCP Server 进程...[/cyan]", callback) - # 创建智能运维 Agent(如果有相应的服务) - if os_service_ids: + if not self.run_script_path or not self.run_script_path.exists(): self._report_progress( state, - f"[bold cyan]创建智能运维 Agent (包含 {len(os_service_ids)} 个 MCP 服务)[/bold cyan]", + f"[red]MCP 启动脚本不存在: {self.run_script_path}[/red]", callback, ) + logger.error("MCP 启动脚本不存在: %s", self.run_script_path) + return False - os_app_id = await self.api_client.create_agent( - "智能运维", - "openEuler 智能运维助手", - os_service_ids, - ) - await self.api_client.publish_agent(os_app_id) - - self._report_progress(state, "[green]智能运维 Agent 创建成功[/green]", callback) - default_app_id = os_app_id # 智能运维作为默认 Agent - - # 创建慢卡检测智能助手(如果有相应的服务) - if systrace_service_ids: - self._report_progress( - state, - f"[bold magenta]创建慢卡检测智能助手 (包含 {len(systrace_service_ids)} 个 MCP 服务)[/bold magenta]", - callback, - ) + try: + # 执行 run.sh 脚本 + cmd = f"bash {self.run_script_path}" + self._report_progress(state, f" [blue]执行命令: {cmd}[/blue]", callback) + logger.info("执行 MCP 启动脚本: %s", cmd) - systrace_app_id = await self.api_client.create_agent( - "慢卡检测智能助手", - "检测集群中的慢卡问题", - systrace_service_ids, + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, ) - await self.api_client.publish_agent(systrace_app_id) - - self._report_progress(state, "[green]慢卡检测智能助手创建成功[/green]", callback) - - if default_app_id: - self._report_progress(state, "[dim]保存默认智能体配置...[/dim]", callback) - self.config_manager.set_default_app(default_app_id) - - return default_app_id or "" - async def _register_mcp_service( - self, - config: McpConfig, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str: - """注册 MCP 服务""" - self._report_progress(state, f" [blue]注册 {config.name}...[/blue]", callback) - return await self.api_client.register_mcp_service(config) - - async def _install_and_wait_mcp_service( - self, - service_id: str, - config_name: str, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """安装并等待 MCP 服务完成""" - self._report_progress(state, f" [cyan]安装 {config_name} (ID: {service_id})...[/cyan]", callback) - await self.api_client.install_mcp_service(service_id) - - self._report_progress(state, f" [dim]等待 {config_name} 安装完成...[/dim]", callback) - if not await self.api_client.wait_for_installation(service_id): - self._report_progress(state, f" [red]{config_name} 安装超时[/red]", callback) - return False - - return True - - async def _activate_mcp_service( - self, - service_id: str, - config_name: str, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> None: - """激活 MCP 服务""" - self._report_progress(state, f" [yellow]激活 {config_name}...[/yellow]", callback) - await self.api_client.activate_mcp_service(service_id) - self._report_progress(state, f" [green]{config_name} 处理完成[/green]", callback) + stdout, _ = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" - async def _process_mcp_service( - self, - config: McpConfig, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> str | None: - """处理单个 MCP 服务""" - # 如果是 SSE 类型,先验证 URL可用且为SSE - if config.mcp_type == "sse": - valid = await self._validate_sse_endpoint(config, state, callback) - if not valid: + if process.returncode == 0: self._report_progress( state, - f" [red]MCP 服务 {config.name} SSE Endpoint 验证失败[/red]", + "[green]MCP Server 启动脚本执行成功[/green]", callback, ) - return None - try: - # 注册服务 - service_id = await self._register_mcp_service(config, state, callback) - - # 安装并等待完成 - if not await self._install_and_wait_mcp_service(service_id, config.name, state, callback): - return None - - # 激活服务 - await self._activate_mcp_service(service_id, config.name, state, callback) - - except (ApiError, httpx.RequestError, Exception) as e: - self._report_progress(state, f" [red]{config.name} 处理失败: {e}[/red]", callback) - logger.exception("MCP 服务 %s 处理失败", config.name) - return None - - else: - return service_id - - async def _validate_sse_endpoint( - self, - config: McpConfig, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """验证 SSE Endpoint 是否可用""" - url = config.config.get("url") or "" - self._report_progress( - state, - f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", - callback, - ) - try: - async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: - response = await client.get( - url, - headers={"Accept": "text/event-stream"}, - ) - if response.status_code != HTTP_OK: - self._report_progress( - state, - f" [red]{config.name} URL 响应码非 200: {response.status_code}[/red]", - callback, - ) - return False - content_type = response.headers.get("content-type", "") - if "text/event-stream" not in content_type: - self._report_progress( - state, - f" [red]{config.name} Content-Type 非 SSE: {content_type}[/red]", - callback, - ) - return False - self._report_progress(state, f" [green]{config.name} SSE Endpoint 验证通过[/green]", callback) + logger.info("MCP Server 启动脚本执行成功") return True - except Exception as e: - self._report_progress(state, f" [red]{config.name} SSE 验证失败:[/red] {e}", callback) - logger.exception("验证 SSE Endpoint 失败: %s", url) - return False - - async def _install_prerequisite_packages( - self, - state: DeploymentState, - callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """安装必要的 RPM 包(已知包可用的情况下)""" - try: - # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) - systrace_exists = self._check_systrace_config(state, callback) - - # 安装包(此时已知包是可用的) - if systrace_exists: - # 安装 sysTrace.rpmlist 中的包 - if not await self._install_rpm_packages("sysTrace.rpmlist", state, callback): - return False - - # 设置 systrace-mcpserver 服务开机启动并立即启动 - if not await self._setup_systrace_service(state, callback): - return False - - # 安装 mcp-servers.rpmlist 中的包 - return await self._install_rpm_packages("mcp-servers.rpmlist", state, callback) except Exception: - error_msg = "安装必要包失败" + error_msg = "执行 MCP Server 启动脚本失败" self._report_progress(state, f"[red]{error_msg}[/red]", callback) logger.exception(error_msg) return False + else: + self._report_progress( + state, + f"[red]MCP Server 启动脚本执行失败 (返回码: {process.returncode})[/red]", + callback, + ) + logger.error("MCP Server 启动脚本执行失败: %s, 输出: %s", cmd, output) + return False - async def _check_rpm_packages_availability( + async def _verify_mcp_services( self, - rpm_list_files: list[str], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, ) -> bool: - """检查 RPM 包是否在 yum 源中可用""" - self._report_progress(state, "[cyan]检查 RPM 包在 yum 源中的可用性...[/cyan]", callback) + """验证 MCP Server 服务状态""" + self._report_progress(state, "[cyan]验证 MCP Server 服务状态...[/cyan]", callback) - if not self.resource_dir: + if not self.service_dir or not self.service_dir.exists(): self._report_progress( state, - "[red]资源目录未找到,无法检查 RPM 包可用性[/red]", + f"[yellow]服务配置目录不存在: {self.service_dir},跳过服务验证[/yellow]", callback, ) - logger.error("资源目录未找到,无法检查 RPM 包可用性") - return False - - all_packages = [] + logger.warning("服务配置目录不存在: %s", self.service_dir) + return True # 不强制要求服务验证 - # 收集所有需要检查的包 - for rpm_list_file in rpm_list_files: - rpm_list_path = self.resource_dir / rpm_list_file - - if not rpm_list_path.exists(): - self._report_progress( - state, - f"[yellow]RPM 列表文件不存在: {rpm_list_file},跳过检查[/yellow]", - callback, - ) - logger.warning("RPM 列表文件不存在: %s", rpm_list_path) - continue - - try: - with rpm_list_path.open(encoding="utf-8") as f: - packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] - all_packages.extend(packages) - except Exception as e: - self._report_progress( - state, - f"[red]读取 RPM 列表文件失败:[/red] {rpm_list_file} - {e}", - callback, - ) - logger.exception("读取 RPM 列表文件失败: %s", rpm_list_path) - return False - - if not all_packages: - self._report_progress(state, "[dim]没有要检查的 RPM 包[/dim]", callback) + # 获取所有 .service 文件 + service_files = list(self.service_dir.glob("*.service")) + if not service_files: + self._report_progress( + state, + "[yellow]未找到服务配置文件,跳过服务验证[/yellow]", + callback, + ) return True - # 检查每个包的可用性 - unavailable_packages = [] + failed_services = [] - for package in all_packages: - # 使用 dnf list available 检查包是否可用 - check_cmd = f"dnf list available {package}" + for service_file in service_files: + service_name = service_file.stem # 去掉 .service 后缀 + self._report_progress( + state, + f" [magenta]检查服务状态: {service_name}[/magenta]", + callback, + ) try: + # 检查服务状态 + cmd = f"systemctl is-active {service_name}" process = await asyncio.create_subprocess_shell( - check_cmd, + cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await process.communicate() + stdout, _ = await process.communicate() + status = stdout.decode("utf-8").strip() if stdout else "" - if process.returncode != 0: - unavailable_packages.append(package) - logger.warning("RPM 包不可用: %s", package) + if status == "active": + self._report_progress( + state, + f" [green]{service_name} 状态正常[/green]", + callback, + ) + logger.info("服务状态正常: %s", service_name) + else: + self._report_progress( + state, + f" [red]{service_name} 状态异常: {status}[/red]", + callback, + ) + logger.warning("服务状态异常: %s -> %s", service_name, status) + failed_services.append(service_name) - except Exception as e: + except Exception: self._report_progress( state, - f" [red]检查包 {package} 失败:[/red] {e}", + f" [red]检查 {service_name} 状态失败[/red]", callback, ) - logger.exception("检查 RPM 包可用性失败: %s", package) - unavailable_packages.append(package) + logger.exception("检查服务状态失败: %s", service_name) + failed_services.append(service_name) - # 如果有不可用的包,返回 False - if unavailable_packages: + if failed_services: self._report_progress( state, - f"[dim]以下 RPM 包不可用,跳过智能体初始化: {', '.join(unavailable_packages)}[/dim]", + f"[yellow]部分服务状态异常: {', '.join(failed_services)},但继续执行[/yellow]", callback, ) - logger.error("发现不可用的 RPM 包,跳过智能体初始化: %s", unavailable_packages) - return False + logger.warning("部分服务状态异常,但继续执行: %s", failed_services) - self._report_progress( - state, - "[green]所有 RPM 包均可用,继续智能体初始化[/green]", - callback, - ) - logger.info("所有 RPM 包均可用") - return True + self._report_progress(state, "[green]MCP Server 服务验证完成[/green]", callback) + return True # 即使有服务异常也继续执行 - async def _check_prerequisite_packages_availability( + async def _register_all_mcp_services( self, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> AgentInitStatus: + ) -> dict[str, str]: """ - 检查必要的 RPM 包是否在 yum 源中可用 + 注册所有 MCP 服务 Returns: - AgentInitStatus: SUCCESS 表示所有包可用,SKIPPED 表示有包不可用应跳过 + dict[str, str]: MCP 路径名 -> 服务 ID 的映射 """ - try: - # 准备要检查的 RPM 列表文件 - rpm_files_to_check = ["mcp-servers.rpmlist"] + self._report_progress(state, "[cyan]注册 MCP 服务...[/cyan]", callback) - # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) - systrace_exists = self._check_systrace_config(state, callback) - if systrace_exists: - rpm_files_to_check.append("sysTrace.rpmlist") + # 加载 MCP 配置 + configs = await self._load_mcp_configs(state, callback) + if not configs: + return {} - # 检查包可用性 - packages_available = await self._check_rpm_packages_availability(rpm_files_to_check, state, callback) + mcp_service_mapping = {} - if not packages_available: + for config_path, config in configs: + service_id = await self._process_mcp_service(config, state, callback) + if service_id: + # 使用配置目录名作为 MCP 路径名 + mcp_path_name = config_path.parent.name + mcp_service_mapping[mcp_path_name] = service_id self._report_progress( state, - "[yellow]MCP Server 相关 RPM 包可用性检查失败,跳过智能体初始化,其他部署步骤将继续进行[/yellow]", + f" [green]{config.name} 注册成功: {mcp_path_name} -> {service_id}[/green]", + callback, + ) + else: + self._report_progress( + state, + f" [red]MCP 服务 {config.name} 注册失败[/red]", callback, ) - return AgentInitStatus.SKIPPED - - except Exception: - error_msg = "检查 RPM 包可用性失败" - self._report_progress(state, f"[red]{error_msg}[/red]", callback) - logger.exception(error_msg) - return AgentInitStatus.SKIPPED # 检查失败也视为跳过,而不是整个部署失败 - else: - return AgentInitStatus.SUCCESS + self._report_progress( + state, + f"[green]MCP 服务注册完成,成功 {len(mcp_service_mapping)} 个[/green]", + callback, + ) + return mcp_service_mapping - def _check_systrace_config( + async def _create_agents_from_config( self, + mcp_service_mapping: dict[str, str], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """检查是否存在以 systrace 开头的配置目录""" - self._report_progress(state, "[cyan]检查 sysTrace 配置...[/cyan]", callback) + ) -> str | None: + """从配置文件创建智能体""" + self._report_progress(state, "[cyan]读取应用配置并创建智能体...[/cyan]", callback) - if not self.resource_dir or not self.mcp_config_dir: - self._report_progress(state, "[yellow]资源目录或 MCP 配置目录不存在[/yellow]", callback) - return False + # 读取应用配置 + app_configs = await self._load_app_configs(state, callback) + if not app_configs: + return None - if not self.mcp_config_dir.exists(): - self._report_progress(state, "[yellow]MCP 配置目录不存在[/yellow]", callback) - return False + created_agents = [] + default_app_id = None - for subdir in self.mcp_config_dir.iterdir(): - if subdir.is_dir() and subdir.name.lower().startswith("systrace"): - self._report_progress(state, f"[green]发现 sysTrace 配置: {subdir.name}[/green]", callback) - logger.info("发现 sysTrace 配置目录: %s", subdir.name) - return True + for i, app_config in enumerate(app_configs): + app_id = await self._create_single_agent( + app_config, + mcp_service_mapping, + state, + callback, + ) + + if app_id: + created_agents.append(app_id) + + # 第一个智能体设置为默认智能体 + if i == 0: + default_app_id = app_id + self._report_progress( + state, + f" [dim]设置默认智能体: {app_config.name}[/dim]", + callback, + ) + self.config_manager.set_default_app(app_id) + + if created_agents: + self._report_progress( + state, + f"[green]成功创建 {len(created_agents)} 个智能体[/green]", + callback, + ) + return default_app_id - self._report_progress(state, "[dim]未发现 sysTrace 配置[/dim]", callback) - return False + self._report_progress(state, "[red]未能创建任何智能体[/red]", callback) + return None - async def _install_rpm_packages( + async def _create_single_agent( self, - rpm_list_file: str, + app_config: AppConfig, + mcp_service_mapping: dict[str, str], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """安装指定 RPM 列表文件中的包""" - if not self.resource_dir: + ) -> str | None: + """创建单个智能体""" + self._report_progress( + state, + f"[magenta]创建智能体: {app_config.name}[/magenta]", + callback, + ) + + # 将 MCP 路径转换为服务 ID + mcp_service_ids, missing_services = self._resolve_mcp_services( + app_config.mcp_path, + mcp_service_mapping, + ) + + if missing_services: self._report_progress( state, - f"[red]资源目录未找到,无法安装 {rpm_list_file}[/red]", + f" [yellow]缺少 MCP 服务: {', '.join(missing_services)},跳过[/yellow]", callback, ) - logger.error("资源目录未找到,无法安装 RPM 包: %s", rpm_list_file) - return False - - rpm_list_path = self.resource_dir / rpm_list_file + logger.warning("智能体 %s 缺少 MCP 服务: %s", app_config.name, missing_services) + return None - if not rpm_list_path.exists(): + if not mcp_service_ids: self._report_progress( state, - f"[yellow]RPM 列表文件不存在: {rpm_list_file}[/yellow]", + f" [yellow]智能体 {app_config.name} 没有可用的 MCP 服务,跳过[/yellow]", callback, ) - logger.warning("RPM 列表文件不存在: %s", rpm_list_path) - return True # 文件不存在不算失败,继续执行 - - self._report_progress(state, f"[cyan]安装 {rpm_list_file} 中的 RPM 包...[/cyan]", callback) + return None try: - # 读取 RPM 包列表 - with rpm_list_path.open(encoding="utf-8") as f: - packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] - - if not packages: - self._report_progress(state, f"[dim]{rpm_list_file} 中没有要安装的包[/dim]", callback) - return True + # 创建智能体 + app_id = await self.api_client.create_agent( + app_config.name, + app_config.description, + mcp_service_ids, + ) - # 使用 dnf 安装包 - package_list = " ".join(packages) - install_cmd = f"sudo dnf install -y {package_list}" + # 发布智能体(如果配置要求发布) + if app_config.published: + await self.api_client.publish_agent(app_id) + except Exception: self._report_progress( state, - f" [blue]执行安装命令: {install_cmd}[/blue]", + f" [red]创建智能体 {app_config.name} 失败[/red]", callback, ) - logger.info("执行 RPM 包安装命令: %s", install_cmd) - - # 使用 asyncio.create_subprocess_shell 执行命令 - process = await asyncio.create_subprocess_shell( - install_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, + logger.exception("创建智能体失败: %s", app_config.name) + return None + else: + self._report_progress( + state, + f" [green]智能体 {app_config.name} 创建成功: {app_id}[/green]", + callback, ) + return app_id - stdout, _ = await process.communicate() - output = stdout.decode("utf-8") if stdout else "" + def _resolve_mcp_services( + self, + mcp_paths: list[str], + mcp_service_mapping: dict[str, str], + ) -> tuple[list[str], list[str]]: + """解析 MCP 路径为服务 ID""" + mcp_service_ids = [] + missing_services = [] - if process.returncode == 0: - self._report_progress( - state, - f" [green]{rpm_list_file} 中的包安装成功[/green]", - callback, - ) - logger.info("RPM 包安装成功: %s", package_list) + for mcp_path in mcp_paths: + if mcp_path in mcp_service_mapping: + mcp_service_ids.append(mcp_service_mapping[mcp_path]) else: + missing_services.append(mcp_path) + + return mcp_service_ids, missing_services + + async def _load_app_configs( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> list[AppConfig]: + """加载应用配置""" + self._report_progress(state, "[cyan]加载应用配置文件...[/cyan]", callback) + + if not self.app_config_path or not self.app_config_path.exists(): + self._report_progress( + state, + f"[red]应用配置文件不存在: {self.app_config_path}[/red]", + callback, + ) + logger.error("应用配置文件不存在: %s", self.app_config_path) + return [] + + try: + with self.app_config_path.open(encoding="utf-8") as f: + config_data = toml.load(f) + + applications = config_data.get("applications", []) + if not applications: self._report_progress( state, - f" [red]{rpm_list_file} 中的包安装失败 (返回码: {process.returncode})[/red]", + "[yellow]配置文件中没有找到应用定义[/yellow]", callback, ) - logger.error("RPM 包安装失败: %s, 输出: %s", package_list, output) - return False + logger.warning("配置文件中没有找到应用定义") + return [] + + app_configs = [] + for app_data in applications: + try: + app_config = AppConfig( + app_type=app_data.get("appType", "agent"), + name=app_data["name"], + description=app_data["description"], + mcp_path=app_data["mcpPath"], + published=app_data.get("published", True), + ) + app_configs.append(app_config) + except KeyError as e: + self._report_progress( + state, + f" [red]应用配置缺少必需字段: {e}[/red]", + callback, + ) + logger.exception("应用配置缺少必需字段") + continue except Exception: - error_msg = f"安装 {rpm_list_file} 失败" - self._report_progress(state, f" [red]{error_msg}[/red]", callback) + error_msg = f"加载应用配置文件失败: {self.app_config_path}" + self._report_progress(state, f"[red]{error_msg}[/red]", callback) logger.exception(error_msg) - return False - - return True + return [] + else: + self._report_progress( + state, + f"[green]成功加载 {len(app_configs)} 个应用配置[/green]", + callback, + ) + return app_configs - async def _setup_systrace_service( + async def _load_mcp_configs( self, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """设置 systrace-mcpserver 服务""" - service_name = "systrace-mcpserver" - self._report_progress(state, f"[magenta]设置 {service_name} 服务...[/magenta]", callback) + ) -> list[tuple[Path, McpConfig]]: + """加载 MCP 配置""" + self._report_progress(state, "[cyan]加载 MCP 配置文件...[/cyan]", callback) - try: - # 启用服务开机启动 - enable_cmd = f"sudo systemctl enable {service_name}" - self._report_progress(state, f" [cyan]设置开机启动: {enable_cmd}[/cyan]", callback) + config_loader = McpConfigLoader(self.mcp_config_dir) + configs = config_loader.load_all_configs() - process = await asyncio.create_subprocess_shell( - enable_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) + if not configs: + self._report_progress(state, "[yellow]未找到 MCP 配置文件[/yellow]", callback) + return [] - stdout, _ = await process.communicate() - output = stdout.decode("utf-8") if stdout else "" + self._report_progress(state, f"[green]成功加载 {len(configs)} 个 MCP 配置[/green]", callback) + return configs - if process.returncode != 0: + async def _process_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str | None: + """处理单个 MCP 服务""" + # 如果是 SSE 类型,先验证 URL 可用且为 SSE + if config.mcp_type == "sse": + valid = await self._validate_sse_endpoint(config, state, callback) + if not valid: self._report_progress( state, - f" [red]设置 {service_name} 开机启动失败: {output}[/red]", + f" [red]MCP 服务 {config.name} SSE Endpoint 验证失败[/red]", callback, ) - logger.error("设置服务开机启动失败: %s, 输出: %s", service_name, output) - return False + return None + try: + # 注册服务 + service_id = await self._register_mcp_service(config, state, callback) - # 启动服务 - start_cmd = f"sudo systemctl start {service_name}" - self._report_progress(state, f" [blue]启动服务: {start_cmd}[/blue]", callback) + # 安装并等待完成 + if not await self._install_and_wait_mcp_service(service_id, config.name, state, callback): + return None - process = await asyncio.create_subprocess_shell( - start_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) + # 激活服务 + await self._activate_mcp_service(service_id, config.name, state, callback) - stdout, _ = await process.communicate() - output = stdout.decode("utf-8") if stdout else "" + except (ApiError, httpx.RequestError, Exception) as e: + self._report_progress(state, f" [red]{config.name} 处理失败: {e}[/red]", callback) + logger.exception("MCP 服务 %s 处理失败", config.name) + return None - if process.returncode == 0: - self._report_progress( - state, - f" [green]{service_name} 服务启动成功[/green]", - callback, - ) - logger.info("sysTrace 服务启动成功: %s", service_name) - else: - self._report_progress( - state, - f" [red]{service_name} 服务启动失败: {output}[/red]", - callback, - ) - logger.error("sysTrace 服务启动失败: %s, 输出: %s", service_name, output) - return False + return service_id - except Exception: - error_msg = f"设置 {service_name} 服务失败" - self._report_progress(state, f" [red]{error_msg}[/red]", callback) - logger.exception(error_msg) + async def _register_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str: + """注册 MCP 服务""" + self._report_progress(state, f" [blue]注册 {config.name}...[/blue]", callback) + return await self.api_client.register_mcp_service(config) + + async def _install_and_wait_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装并等待 MCP 服务完成""" + self._report_progress(state, f" [cyan]安装 {config_name} (ID: {service_id})...[/cyan]", callback) + await self.api_client.install_mcp_service(service_id) + + self._report_progress(state, f" [dim]等待 {config_name} 安装完成...[/dim]", callback) + if not await self.api_client.wait_for_installation(service_id): + self._report_progress(state, f" [red]{config_name} 安装超时[/red]", callback) return False return True + + async def _activate_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> None: + """激活 MCP 服务""" + self._report_progress(state, f" [yellow]激活 {config_name}...[/yellow]", callback) + await self.api_client.activate_mcp_service(service_id) + self._report_progress(state, f" [green]{config_name} 处理完成[/green]", callback) + + async def _validate_sse_endpoint( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """验证 SSE Endpoint 是否可用""" + url = config.config.get("url") or "" + self._report_progress( + state, + f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", + callback, + ) + try: + async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: + response = await client.get( + url, + headers={"Accept": "text/event-stream"}, + ) + if response.status_code != HTTP_OK: + self._report_progress( + state, + f" [red]{config.name} URL 响应码非 200: {response.status_code}[/red]", + callback, + ) + return False + content_type = response.headers.get("content-type", "") + if "text/event-stream" not in content_type: + self._report_progress( + state, + f" [red]{config.name} Content-Type 非 SSE: {content_type}[/red]", + callback, + ) + return False + self._report_progress(state, f" [green]{config.name} SSE Endpoint 验证通过[/green]", callback) + return True + except Exception as e: + self._report_progress(state, f" [red]{config.name} SSE 验证失败:[/red] {e}", callback) + logger.exception("验证 SSE Endpoint 失败: %s", url) + return False -- Gitee From 73329455efde3f5e363b0c7baa7cac808b45b904 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 5 Sep 2025 16:57:08 +0800 Subject: [PATCH 3/7] =?UTF-8?q?fix(deploy):=20=E4=BF=AE=E6=AD=A3=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=A3=80=E6=9F=A5=E5=87=BD=E6=95=B0=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E9=A1=BA=E5=BA=8F=E4=BB=A5=E7=A1=AE=E4=BF=9D=E6=AD=A3=E7=A1=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=94=AF=E6=8C=81=E7=9A=84=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- scripts/deploy/1-check-env/check_env.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/deploy/1-check-env/check_env.sh b/scripts/deploy/1-check-env/check_env.sh index 718ea4b..021034e 100644 --- a/scripts/deploy/1-check-env/check_env.sh +++ b/scripts/deploy/1-check-env/check_env.sh @@ -181,8 +181,8 @@ function check_user { function check_version { local current_version_id="$1" - local supported_versions=("${@:2}") - local sp="$3" + local sp="$2" + local supported_versions=("${@:3}") echo -e "${COLOR_INFO}[Info] 当前操作系统版本为:$current_version_id LTS-$sp${COLOR_RESET}" for version_id in "${supported_versions[@]}"; do if [[ "$current_version_id" == "$version_id" ]]; then @@ -226,7 +226,7 @@ function check_os_version { case $id in "openEuler") local supported_versions=("22.03" "24.03" "25.03" "25.09") - check_version "$version" "${supported_versions[@]}" "$sp" + check_version "$version" "$sp" "${supported_versions[@]}" ;; "hce") echo -e "${COLOR_INFO}[Info] 检测到 HCE 发行版,跳过版本检查${COLOR_RESET}" -- Gitee From c2f38291f9799f9430beab457aed2da28b11047e Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 5 Sep 2025 17:33:28 +0800 Subject: [PATCH 4/7] =?UTF-8?q?refactor(deploy-agent):=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E6=99=BA=E8=83=BD=E4=BD=93=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=EF=BC=8C=E6=8F=90=E5=8F=96=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E6=AD=A5=E9=AA=A4=E4=B8=BA=E7=8B=AC=E7=AB=8B=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E5=B9=B6=E5=A2=9E=E5=BC=BA=E6=9C=8D=E5=8A=A1=E9=AA=8C?= =?UTF-8?q?=E8=AF=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 393 ++++++++++++++++++++++++++++-------- 1 file changed, 306 insertions(+), 87 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 6fca86a..48762dd 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -27,7 +27,7 @@ from log.manager import get_logger from .models import AgentInitStatus, DeploymentState if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Awaitable, Callable logger = get_logger(__name__) @@ -400,45 +400,58 @@ class AgentManager: self._report_progress(state, "[bold blue]开始初始化智能体...[/bold blue]", progress_callback) try: - # 1. 运行脚本拉起 MCP Server 进程 - if not await self._start_mcp_servers(state, progress_callback): - return AgentInitStatus.FAILED - - # 2. 验证 MCP Server 服务状态 - if not await self._verify_mcp_services(state, progress_callback): - return AgentInitStatus.FAILED - - # 3. 加载 MCP 配置并注册服务 - mcp_service_mapping = await self._register_all_mcp_services(state, progress_callback) - if not mcp_service_mapping: - return AgentInitStatus.FAILED - - # 4. 读取应用配置并创建智能体 - default_app_id = await self._create_agents_from_config( - mcp_service_mapping, - state, - progress_callback, - ) - - if default_app_id: - self._report_progress( - state, - f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", - progress_callback, - ) - logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) - return AgentInitStatus.SUCCESS + # 执行所有初始化步骤 + return await self._execute_initialization_steps(state, progress_callback) except Exception: error_msg = "智能体初始化失败" self._report_progress(state, f"[red]{error_msg}[/red]", progress_callback) logger.exception(error_msg) return AgentInitStatus.FAILED - else: - # 如果没有创建任何智能体,显示警告并返回成功状态 - self._report_progress(state, "[yellow]未能创建任何智能体[/yellow]", progress_callback) + + async def _execute_initialization_steps( + self, + state: DeploymentState, + progress_callback: Callable[[DeploymentState], None] | None, + ) -> AgentInitStatus: + """执行所有初始化步骤""" + # 1. 安装 systemd 服务文件 + if not await self._install_service_files(state, progress_callback): + return AgentInitStatus.FAILED + + # 2. 运行脚本拉起 MCP Server 进程 + if not await self._start_mcp_servers(state, progress_callback): + return AgentInitStatus.FAILED + + # 3. 验证 MCP Server 服务状态 + if not await self._verify_mcp_services(state, progress_callback): + return AgentInitStatus.FAILED + + # 4. 加载 MCP 配置并注册服务 + mcp_service_mapping = await self._register_all_mcp_services(state, progress_callback) + if not mcp_service_mapping: + return AgentInitStatus.FAILED + + # 5. 读取应用配置并创建智能体 + default_app_id = await self._create_agents_from_config( + mcp_service_mapping, + state, + progress_callback, + ) + + if default_app_id: + self._report_progress( + state, + f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", + progress_callback, + ) + logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) return AgentInitStatus.SUCCESS + # 如果没有创建任何智能体,显示警告并返回成功状态 + self._report_progress(state, "[yellow]未能创建任何智能体[/yellow]", progress_callback) + return AgentInitStatus.SUCCESS + def _report_progress( self, state: DeploymentState, @@ -450,6 +463,216 @@ class AgentManager: if callback: callback(state) + def _get_service_files( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + operation_name: str, + ) -> list[Path] | None: + """ + 获取服务文件列表的通用方法 + + Returns: + list[Path]: 服务文件列表,如果应该跳过操作则返回 None + + """ + if not self.service_dir or not self.service_dir.exists(): + self._report_progress( + state, + f"[yellow]服务配置目录不存在: {self.service_dir},跳过{operation_name}[/yellow]", + callback, + ) + logger.warning("服务配置目录不存在: %s", self.service_dir) + return None + + # 获取所有 .service 文件 + service_files = list(self.service_dir.glob("*.service")) + if not service_files: + self._report_progress( + state, + f"[yellow]未找到服务配置文件,跳过{operation_name}[/yellow]", + callback, + ) + return None + + return service_files + + async def _process_service_files( + self, + service_files: list[Path], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + processor_func: Callable[ + [Path, DeploymentState, Callable[[DeploymentState], None] | None], + Awaitable[tuple[bool, str]], + ], + ) -> tuple[bool, list[str], list[str]]: + """ + 处理服务文件的通用框架 + + Args: + service_files: 要处理的服务文件列表 + state: 部署状态 + callback: 进度回调函数 + processor_func: 处理单个文件的函数,返回 (成功标志, 文件名) + + Returns: + tuple[bool, list[str], list[str]]: (总体是否成功, 成功的文件列表, 失败的文件列表) + + """ + success_files = [] + failed_files = [] + + for service_file in service_files: + try: + success, file_identifier = await processor_func(service_file, state, callback) + if success: + success_files.append(file_identifier) + else: + failed_files.append(file_identifier) + except Exception: + file_identifier = service_file.stem + self._report_progress( + state, + f" [red]处理 {file_identifier} 时发生异常[/red]", + callback, + ) + logger.exception("处理服务文件时发生异常: %s", service_file) + failed_files.append(file_identifier) + + return len(failed_files) == 0, success_files, failed_files + + async def _install_service_files( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装 systemd 服务文件""" + self._report_progress(state, "[cyan]安装 systemd 服务文件...[/cyan]", callback) + + # 获取服务文件列表 + service_files = self._get_service_files(state, callback, "服务文件安装") + if service_files is None: + return True + + # 处理所有服务文件 + overall_success, installed_files, failed_files = await self._process_service_files( + service_files, state, callback, self._install_single_service_file, + ) + + # 如果有成功安装的文件,重新加载 systemd 配置 + if installed_files: + if not await self._reload_systemd_daemon(state, callback): + return False + + self._report_progress( + state, + f"[green]成功安装 {len(installed_files)} 个服务文件[/green]", + callback, + ) + + return True + + async def _install_single_service_file( + self, + service_file: Path, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> tuple[bool, str]: + """安装单个服务文件""" + service_name = service_file.name + systemd_dir = Path("/etc/systemd/system") + target_path = systemd_dir / service_name + + self._report_progress( + state, + f" [blue]复制服务文件: {service_name}[/blue]", + callback, + ) + + try: + # 复制服务文件到 systemd 目录 + cmd = f"sudo cp {service_file} {target_path}" + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + + except Exception: + self._report_progress( + state, + f" [red]复制 {service_name} 时发生异常[/red]", + callback, + ) + logger.exception("复制服务文件时发生异常: %s", service_file) + return False, service_name + else: + if process.returncode == 0: + self._report_progress( + state, + f" [green]{service_name} 复制成功[/green]", + callback, + ) + logger.info("服务文件复制成功: %s -> %s", service_file, target_path) + return True, service_name + + error_output = stderr.decode("utf-8") if stderr else "" + self._report_progress( + state, + f" [red]{service_name} 复制失败: {error_output}[/red]", + callback, + ) + logger.error("服务文件复制失败: %s, 错误: %s", service_file, error_output) + return False, service_name + + async def _reload_systemd_daemon( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """重新加载 systemd 配置""" + self._report_progress(state, "[cyan]重新加载 systemd 配置...[/cyan]", callback) + + try: + cmd = "sudo systemctl daemon-reload" + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + + except Exception: + self._report_progress( + state, + "[red]重新加载 systemd 配置时发生异常[/red]", + callback, + ) + logger.exception("重新加载 systemd 配置时发生异常") + return False + else: + if process.returncode == 0: + self._report_progress( + state, + "[green]systemd 配置重新加载成功[/green]", + callback, + ) + logger.info("systemd 配置重新加载成功") + return True + + error_output = stderr.decode("utf-8") if stderr else "" + self._report_progress( + state, + f"[red]systemd 配置重新加载失败: {error_output}[/red]", + callback, + ) + logger.error("systemd 配置重新加载失败: %s", error_output) + return False + async def _start_mcp_servers( self, state: DeploymentState, @@ -513,82 +736,78 @@ class AgentManager: """验证 MCP Server 服务状态""" self._report_progress(state, "[cyan]验证 MCP Server 服务状态...[/cyan]", callback) - if not self.service_dir or not self.service_dir.exists(): + # 获取服务文件列表 + service_files = self._get_service_files(state, callback, "服务验证") + if service_files is None: + return True + + # 处理所有服务文件 + overall_success, active_services, failed_services = await self._process_service_files( + service_files, state, callback, self._verify_single_service, + ) + + if failed_services: self._report_progress( state, - f"[yellow]服务配置目录不存在: {self.service_dir},跳过服务验证[/yellow]", + f"[yellow]部分服务状态异常: {', '.join(failed_services)},但继续执行[/yellow]", callback, ) - logger.warning("服务配置目录不存在: %s", self.service_dir) - return True # 不强制要求服务验证 + logger.warning("部分服务状态异常,但继续执行: %s", failed_services) - # 获取所有 .service 文件 - service_files = list(self.service_dir.glob("*.service")) - if not service_files: - self._report_progress( - state, - "[yellow]未找到服务配置文件,跳过服务验证[/yellow]", - callback, + self._report_progress(state, "[green]MCP Server 服务验证完成[/green]", callback) + return True # 即使有服务异常也继续执行 + + async def _verify_single_service( + self, + service_file: Path, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> tuple[bool, str]: + """验证单个服务状态""" + service_name = service_file.stem # 去掉 .service 后缀 + self._report_progress( + state, + f" [magenta]检查服务状态: {service_name}[/magenta]", + callback, + ) + + try: + # 检查服务状态 + cmd = f"systemctl is-active {service_name}" + process = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - return True - failed_services = [] + stdout, _ = await process.communicate() + status = stdout.decode("utf-8").strip() if stdout else "" - for service_file in service_files: - service_name = service_file.stem # 去掉 .service 后缀 + except Exception: self._report_progress( state, - f" [magenta]检查服务状态: {service_name}[/magenta]", + f" [red]检查 {service_name} 状态失败[/red]", callback, ) - - try: - # 检查服务状态 - cmd = f"systemctl is-active {service_name}" - process = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdout, _ = await process.communicate() - status = stdout.decode("utf-8").strip() if stdout else "" - - if status == "active": - self._report_progress( - state, - f" [green]{service_name} 状态正常[/green]", - callback, - ) - logger.info("服务状态正常: %s", service_name) - else: - self._report_progress( - state, - f" [red]{service_name} 状态异常: {status}[/red]", - callback, - ) - logger.warning("服务状态异常: %s -> %s", service_name, status) - failed_services.append(service_name) - - except Exception: + logger.exception("检查服务状态失败: %s", service_name) + return False, service_name + else: + if status == "active": self._report_progress( state, - f" [red]检查 {service_name} 状态失败[/red]", + f" [green]{service_name} 状态正常[/green]", callback, ) - logger.exception("检查服务状态失败: %s", service_name) - failed_services.append(service_name) + logger.info("服务状态正常: %s", service_name) + return True, service_name - if failed_services: self._report_progress( state, - f"[yellow]部分服务状态异常: {', '.join(failed_services)},但继续执行[/yellow]", + f" [red]{service_name} 状态异常: {status}[/red]", callback, ) - logger.warning("部分服务状态异常,但继续执行: %s", failed_services) - - self._report_progress(state, "[green]MCP Server 服务验证完成[/green]", callback) - return True # 即使有服务异常也继续执行 + logger.warning("服务状态异常: %s -> %s", service_name, status) + return False, service_name async def _register_all_mcp_services( self, -- Gitee From 7c05f7604ea09fbc190337d3bd85e1af8e24113d Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 5 Sep 2025 17:48:28 +0800 Subject: [PATCH 5/7] =?UTF-8?q?feat(deploy-agent):=20=E6=B7=BB=E5=8A=A0=20?= =?UTF-8?q?SSE=20Endpoint=20=E9=AA=8C=E8=AF=81=E9=87=8D=E8=AF=95=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E5=A2=9E=E5=BC=BA=E8=BF=9E=E6=8E=A5=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 83 +++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 48762dd..bf25255 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -1156,30 +1156,61 @@ class AgentManager: f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", callback, ) - try: - async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: - response = await client.get( - url, - headers={"Accept": "text/event-stream"}, - ) - if response.status_code != HTTP_OK: - self._report_progress( - state, - f" [red]{config.name} URL 响应码非 200: {response.status_code}[/red]", - callback, - ) - return False - content_type = response.headers.get("content-type", "") - if "text/event-stream" not in content_type: - self._report_progress( - state, - f" [red]{config.name} Content-Type 非 SSE: {content_type}[/red]", - callback, + + # 重试配置 + max_attempts = 36 # 3分钟 / 5秒 = 36次 + retry_interval = 5 # 5秒重试间隔 + + for attempt in range(1, max_attempts + 1): + try: + async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: + response = await client.get( + url, + headers={"Accept": "text/event-stream"}, ) - return False - self._report_progress(state, f" [green]{config.name} SSE Endpoint 验证通过[/green]", callback) - return True - except Exception as e: - self._report_progress(state, f" [red]{config.name} SSE 验证失败:[/red] {e}", callback) - logger.exception("验证 SSE Endpoint 失败: %s", url) - return False + + if response.status_code != HTTP_OK: + logger.debug( + "SSE Endpoint 响应码非 200: %s, 状态码: %d, 尝试: %d/%d", + url, response.status_code, attempt, max_attempts, + ) + else: + content_type = response.headers.get("content-type", "") + if "text/event-stream" not in content_type: + self._report_progress( + state, + f" [yellow]{config.name} Content-Type 非 SSE: {content_type}[/yellow]", + callback, + ) + logger.debug( + "SSE Endpoint Content-Type 非 SSE: %s, Content-Type: %s, 尝试: %d/%d", + url, content_type, attempt, max_attempts, + ) + else: + # 验证成功 + self._report_progress( + state, + f" [green]{config.name} SSE Endpoint 验证通过[/green]", + callback, + ) + logger.info("SSE Endpoint 验证成功: %s (尝试 %d 次)", url, attempt) + return True + + except (httpx.RequestError, httpx.HTTPStatusError) as e: + logger.debug("SSE Endpoint 连接失败: %s, 错误: %s, 尝试: %d/%d", url, e, attempt, max_attempts) + + # 如果还有重试机会,等待后继续 + if attempt < max_attempts: + await asyncio.sleep(retry_interval) + + # 所有尝试都失败了 + self._report_progress( + state, + f" [red]{config.name} SSE Endpoint 验证失败: 3分钟内无法连接[/red]", + callback, + ) + logger.error( + "SSE Endpoint 验证最终失败: %s (尝试了 %d 次,耗时 %d 秒)", + url, max_attempts, max_attempts * retry_interval, + ) + return False -- Gitee From 5342c4b90bab19344bcdfcbd9d3e8d9cbb952c98 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Mon, 8 Sep 2025 10:33:49 +0800 Subject: [PATCH 6/7] =?UTF-8?q?feat(deploy-agent):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=97=A7=E8=BF=9B=E7=A8=8B=E6=B8=85=E7=90=86=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 81 ++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index bf25255..28dfc0a 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -14,6 +14,7 @@ from __future__ import annotations import asyncio import json +import subprocess from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any @@ -557,7 +558,10 @@ class AgentManager: # 处理所有服务文件 overall_success, installed_files, failed_files = await self._process_service_files( - service_files, state, callback, self._install_single_service_file, + service_files, + state, + callback, + self._install_single_service_file, ) # 如果有成功安装的文件,重新加载 systemd 配置 @@ -690,6 +694,16 @@ class AgentManager: logger.error("MCP 启动脚本不存在: %s", self.run_script_path) return False + # 1. 先检查并清理可能存在的旧进程 + if not await self._cleanup_old_mcp_processes(state, callback): + # 清理失败不会阻止继续执行,只是记录警告 + self._report_progress( + state, + "[yellow]清理旧进程时遇到问题,但继续执行启动脚本[/yellow]", + callback, + ) + + # 2. 执行启动脚本 try: # 执行 run.sh 脚本 cmd = f"bash {self.run_script_path}" @@ -728,6 +742,52 @@ class AgentManager: logger.error("MCP Server 启动脚本执行失败: %s, 输出: %s", cmd, output) return False + async def _cleanup_old_mcp_processes( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查并清理可能存在的旧 MCP 进程""" + # 静默获取服务文件列表 + if not self.service_dir or not self.service_dir.exists(): + return True + + service_files = list(self.service_dir.glob("*.service")) + if not service_files: + return True + + # 静默清理服务 + for service_file in service_files: + service_name = service_file.stem # 去掉 .service 后缀 + await self._stop_service(service_name) + + return True + + async def _stop_service(self, service_name: str) -> None: + """静默停止服务""" + try: + # 检查服务状态 + status_cmd = f"systemctl is-active {service_name}" + status_process = await asyncio.create_subprocess_shell( + status_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await status_process.communicate() + status = stdout.decode("utf-8").strip() if stdout else "" + + # 如果服务正在运行,静默停止它 + if status == "active": + stop_cmd = f"sudo systemctl stop {service_name}" + await asyncio.create_subprocess_shell( + stop_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except (OSError, subprocess.SubprocessError): + # 静默忽略任何错误 + logger.debug("静默停止服务时发生异常: %s", service_name) + async def _verify_mcp_services( self, state: DeploymentState, @@ -743,7 +803,10 @@ class AgentManager: # 处理所有服务文件 overall_success, active_services, failed_services = await self._process_service_files( - service_files, state, callback, self._verify_single_service, + service_files, + state, + callback, + self._verify_single_service, ) if failed_services: @@ -1172,7 +1235,10 @@ class AgentManager: if response.status_code != HTTP_OK: logger.debug( "SSE Endpoint 响应码非 200: %s, 状态码: %d, 尝试: %d/%d", - url, response.status_code, attempt, max_attempts, + url, + response.status_code, + attempt, + max_attempts, ) else: content_type = response.headers.get("content-type", "") @@ -1184,7 +1250,10 @@ class AgentManager: ) logger.debug( "SSE Endpoint Content-Type 非 SSE: %s, Content-Type: %s, 尝试: %d/%d", - url, content_type, attempt, max_attempts, + url, + content_type, + attempt, + max_attempts, ) else: # 验证成功 @@ -1211,6 +1280,8 @@ class AgentManager: ) logger.error( "SSE Endpoint 验证最终失败: %s (尝试了 %d 次,耗时 %d 秒)", - url, max_attempts, max_attempts * retry_interval, + url, + max_attempts, + max_attempts * retry_interval, ) return False -- Gitee From 38e91fbb7fa301261ff65b9bd1edec6e171f94bd Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Mon, 8 Sep 2025 10:41:36 +0800 Subject: [PATCH 7/7] =?UTF-8?q?refactor(deploy-agent):=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=8B=89=E8=B5=B7=20MCP=20=E6=9C=8D=E5=8A=A1=E5=90=8E?= =?UTF-8?q?=E9=AA=8C=E8=AF=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 142 ++++++++++++++++++++++-------------- 1 file changed, 88 insertions(+), 54 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 28dfc0a..de9788d 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -139,7 +139,7 @@ class ApiClient: def __init__(self, server_ip: str, server_port: int) -> None: """初始化 API 客户端""" self.base_url = f"http://{server_ip}:{server_port}" - self.timeout = 60.0 # httpx 使用浮点数作为超时 + self.timeout = 10.0 async def register_mcp_service(self, config: McpConfig) -> str: """注册 MCP 服务""" @@ -812,39 +812,62 @@ class AgentManager: if failed_services: self._report_progress( state, - f"[yellow]部分服务状态异常: {', '.join(failed_services)},但继续执行[/yellow]", + f"[red]关键服务状态异常: {', '.join(failed_services)},停止初始化[/red]", callback, ) - logger.warning("部分服务状态异常,但继续执行: %s", failed_services) + logger.error("关键服务状态异常,停止初始化: %s", failed_services) + return False self._report_progress(state, "[green]MCP Server 服务验证完成[/green]", callback) - return True # 即使有服务异常也继续执行 + return True async def _verify_single_service( self, service_file: Path, state: DeploymentState, callback: Callable[[DeploymentState], None] | None, + retry_count: int = 0, ) -> tuple[bool, str]: """验证单个服务状态""" + await asyncio.sleep(2) + service_name = service_file.stem # 去掉 .service 后缀 - self._report_progress( - state, - f" [magenta]检查服务状态: {service_name}[/magenta]", - callback, - ) + + # 限制递归次数,最多重试6次(30秒) + max_retries = 6 + if retry_count > max_retries: + self._report_progress( + state, + f" [red]{service_name} 启动超时 (30秒)[/red]", + callback, + ) + logger.error("服务启动超时: %s", service_name) + return False, service_name + + if retry_count == 0: + self._report_progress( + state, + f" [magenta]检查服务状态: {service_name}[/magenta]", + callback, + ) + else: + self._report_progress( + state, + f" [dim]{service_name} 重新检查状态... (第 {retry_count} 次)[/dim]", + callback, + ) try: - # 检查服务状态 - cmd = f"systemctl is-active {service_name}" + # 使用 systemctl status 获取详细状态信息 + cmd = f"systemctl status {service_name}" process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, _ = await process.communicate() - status = stdout.decode("utf-8").strip() if stdout else "" + stdout, stderr = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" except Exception: self._report_progress( @@ -855,21 +878,48 @@ class AgentManager: logger.exception("检查服务状态失败: %s", service_name) return False, service_name else: - if status == "active": + # systemctl status 返回码: 0=active, 1=dead, 2=unknown, 3=not-found, 4=permission-denied + if process.returncode == 0 and "active (running)" in output.lower(): + # 服务正常运行 self._report_progress( state, - f" [green]{service_name} 状态正常[/green]", + f" [green]{service_name} 状态正常 (active running)[/green]", callback, ) logger.info("服务状态正常: %s", service_name) return True, service_name + # 分析输出内容,检查是否有失败信息 + if "failed" in output.lower() or "code=exited" in output.lower(): + self._report_progress( + state, + f" [red]{service_name} 服务启动失败[/red]", + callback, + ) + logger.error("服务启动失败: %s, 详细信息: %s", service_name, output.strip()) + return False, service_name + + # 检查是否真的在启动中(activating 状态) + if "activating" in output.lower() and "start" in output.lower(): + if retry_count == 0: + self._report_progress( + state, + f" [yellow]{service_name} 正在启动中,等待启动完成...[/yellow]", + callback, + ) + logger.info("服务正在启动中,等待启动完成: %s", service_name) + + # 等待3秒后递归调用自己 + await asyncio.sleep(3) + return await self._verify_single_service(service_file, state, callback, retry_count + 1) + + # 其他状态都认为是异常 self._report_progress( state, - f" [red]{service_name} 状态异常: {status}[/red]", + f" [red]{service_name} 状态异常 (返回码: {process.returncode})[/red]", callback, ) - logger.warning("服务状态异常: %s -> %s", service_name, status) + logger.warning("服务状态异常: %s, 返回码: %d, 输出: %s", service_name, process.returncode, output.strip()) return False, service_name async def _register_all_mcp_services( @@ -1221,50 +1271,34 @@ class AgentManager: ) # 重试配置 - max_attempts = 36 # 3分钟 / 5秒 = 36次 + max_attempts = 6 # 30秒 / 5秒 = 6次 retry_interval = 5 # 5秒重试间隔 for attempt in range(1, max_attempts + 1): try: - async with httpx.AsyncClient(timeout=self.api_client.timeout) as client: - response = await client.get( + # 使用流式请求,只读取响应头,避免 SSE 连接一直保持开放 + async with ( + httpx.AsyncClient(timeout=self.api_client.timeout) as client, + client.stream("GET", url, headers={"Accept": "text/event-stream"}) as response, + ): + if response.status_code == HTTP_OK: + # 验证成功 + self._report_progress( + state, + f" [green]{config.name} SSE Endpoint 验证通过[/green]", + callback, + ) + logger.info("SSE Endpoint 验证成功: %s (尝试 %d 次)", url, attempt) + return True + + logger.debug( + "SSE Endpoint 响应码非 200: %s, 状态码: %d, 尝试: %d/%d", url, - headers={"Accept": "text/event-stream"}, + response.status_code, + attempt, + max_attempts, ) - if response.status_code != HTTP_OK: - logger.debug( - "SSE Endpoint 响应码非 200: %s, 状态码: %d, 尝试: %d/%d", - url, - response.status_code, - attempt, - max_attempts, - ) - else: - content_type = response.headers.get("content-type", "") - if "text/event-stream" not in content_type: - self._report_progress( - state, - f" [yellow]{config.name} Content-Type 非 SSE: {content_type}[/yellow]", - callback, - ) - logger.debug( - "SSE Endpoint Content-Type 非 SSE: %s, Content-Type: %s, 尝试: %d/%d", - url, - content_type, - attempt, - max_attempts, - ) - else: - # 验证成功 - self._report_progress( - state, - f" [green]{config.name} SSE Endpoint 验证通过[/green]", - callback, - ) - logger.info("SSE Endpoint 验证成功: %s (尝试 %d 次)", url, attempt) - return True - except (httpx.RequestError, httpx.HTTPStatusError) as e: logger.debug("SSE Endpoint 连接失败: %s, 错误: %s, 尝试: %d/%d", url, e, attempt, max_attempts) -- Gitee