From 594e49671ac45a02548a3d57407d98bc5d0d125c Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Tue, 19 Aug 2025 09:38:10 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat(deploy):=20=E6=B7=BB=E5=8A=A0=E6=99=BA?= =?UTF-8?q?=E8=83=BD=E4=BD=93=E5=88=9D=E5=A7=8B=E5=8C=96=E5=8A=9F=E8=83=BD?= =?UTF-8?q?;=20=E6=9B=B4=E6=96=B0=E9=85=8D=E7=BD=AE=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=99=A8=E4=BB=A5=E6=94=AF=E6=8C=81=E9=BB=98=E8=AE=A4=E6=99=BA?= =?UTF-8?q?=E8=83=BD=E4=BD=93=20ID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 488 ++++++++++++++++++++++++++++++++++ src/app/deployment/service.py | 132 +-------- src/config/manager.py | 9 + src/config/model.py | 8 +- 4 files changed, 514 insertions(+), 123 deletions(-) create mode 100644 src/app/deployment/agent.py diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py new file mode 100644 index 0000000..ee59911 --- /dev/null +++ b/src/app/deployment/agent.py @@ -0,0 +1,488 @@ +""" +Agent 管理模块。 + +处理 MCP 服务和智能体的注册、安装、激活和管理。 + +该模块提供: +- McpConfig: MCP 配置数据模型 +- McpConfigLoader: MCP 配置文件加载器 +- ApiClient: HTTP API 客户端 +- AgentManager: 智能体管理器主类 +""" + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import httpx + +from config.manager import ConfigManager +from log.manager import get_logger + +from .models import DeploymentState + +if TYPE_CHECKING: + from collections.abc import Callable + +logger = get_logger(__name__) + +# HTTP 状态码常量 +HTTP_OK = 200 + + +class ConfigError(Exception): + """配置错误异常""" + + +class ApiError(Exception): + """API 错误异常""" + + +@dataclass +class McpConfig: + """MCP 配置模型""" + + name: str + description: str + overview: str + config: dict[str, Any] + mcp_type: str + + +@dataclass +class McpServerInfo: + """MCP 服务信息""" + + service_id: str + name: str + config_path: Path + config: McpConfig + + +@dataclass +class AgentInfo: + """智能体信息""" + + app_id: str + name: str + description: str + mcp_services: list[str] + + +class McpConfigLoader: + """MCP 配置加载器""" + + def __init__(self, config_dir: Path) -> None: + """初始化配置加载器""" + self.config_dir = config_dir + + def load_all_configs(self) -> list[tuple[Path, McpConfig]]: + """加载所有 MCP 配置""" + configs = [] + if not self.config_dir.exists(): + msg = f"配置目录不存在: {self.config_dir}" + logger.error(msg) + raise ConfigError(msg) + + for subdir in self.config_dir.iterdir(): + if subdir.is_dir(): + config_file = subdir / "config.json" + if config_file.exists(): + try: + config = self._load_config(config_file, subdir.name) + configs.append((config_file, config)) + logger.info("加载 MCP 配置: %s", subdir.name) + except (json.JSONDecodeError, KeyError): + logger.exception("加载配置文件失败: %s", config_file) + continue + + if not configs: + msg = f"未找到有效的 MCP 配置文件在: {self.config_dir}" + logger.warning(msg) + + return configs + + def _load_config(self, config_file: Path, name: str) -> McpConfig: + """加载单个配置文件""" + with config_file.open(encoding="utf-8") as f: + config_data = json.load(f) + + return McpConfig( + name=name, + description=name, + overview=name, + config=config_data, + mcp_type="sse", # 默认类型 + ) + + +class ApiClient: + """API 客户端""" + + def __init__(self, server_ip: str, server_port: int) -> None: + """初始化 API 客户端""" + self.base_url = f"http://{server_ip}:{server_port}" + self.timeout = 60.0 # httpx 使用浮点数作为超时 + + async def register_mcp_service(self, config: McpConfig) -> str: + """注册 MCP 服务""" + url = f"{self.base_url}/api/mcp" + payload = { + "name": config.name, + "description": config.description, + "overview": config.overview, + "config": config.config, + "mcpType": config.mcp_type, + } + + logger.info("注册 MCP 服务: %s", config.name) + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.post(url, json=payload) + response.raise_for_status() + + result = response.json() + if result.get("code") != HTTP_OK: + msg = f"注册 MCP 服务失败: {result.get('message', 'Unknown error')}" + logger.error(msg) + raise ApiError(msg) + + service_id = result["result"]["serviceId"] + logger.info("MCP 服务注册成功: %s -> %s", config.name, service_id) + + except httpx.RequestError as e: + msg = f"注册 MCP 服务网络错误: {e}" + logger.exception(msg) + raise ApiError(msg) from e + + else: + return service_id + + async def install_mcp_service(self, service_id: str) -> None: + """安装 MCP 服务""" + url = f"{self.base_url}/api/mcp/{service_id}/install?install=true" + + logger.info("安装 MCP 服务: %s", service_id) + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.post(url) + response.raise_for_status() + logger.info("MCP 服务安装请求已发送: %s", service_id) + except httpx.RequestError as e: + msg = f"安装 MCP 服务网络错误: {e}" + logger.exception(msg) + raise ApiError(msg) from e + + async def check_mcp_service_status(self, service_id: str) -> bool: + """检查 MCP 服务状态""" + url = f"{self.base_url}/api/mcp/{service_id}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.get(url) + response.raise_for_status() + + result = response.json() + # 假设安装成功的标志是 code == 200 + return result.get("code") == HTTP_OK + + except httpx.RequestError: + return False + + async def wait_for_installation( + self, + service_id: str, + max_wait_time: int = 300, + check_interval: int = 10, + ) -> bool: + """等待 MCP 服务安装完成""" + logger.info("等待 MCP 服务安装完成: %s", service_id) + + for attempt in range(max_wait_time // check_interval): + if await self.check_mcp_service_status(service_id): + logger.info("MCP 服务安装完成: %s", service_id) + return True + + logger.debug("MCP 服务 %s 安装中... (第 %d 次检查)", service_id, attempt + 1) + await asyncio.sleep(check_interval) + + logger.error("MCP 服务安装超时: %s", service_id) + return False + + async def activate_mcp_service(self, service_id: str) -> None: + """激活 MCP 服务""" + url = f"{self.base_url}/api/mcp/{service_id}" + payload = {"active": True} + + logger.info("激活 MCP 服务: %s", service_id) + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.post(url, json=payload) + response.raise_for_status() + + result = response.json() + if result.get("code") != HTTP_OK: + msg = f"激活 MCP 服务失败: {result.get('message', 'Unknown error')}" + logger.error(msg) + raise ApiError(msg) + + logger.info("MCP 服务激活成功: %s", service_id) + + except httpx.RequestError as e: + msg = f"激活 MCP 服务网络错误: {e}" + logger.exception(msg) + raise ApiError(msg) from e + + async def create_agent( + self, + name: str, + description: str, + mcp_service_ids: list[str], + ) -> str: + """创建智能体""" + url = f"{self.base_url}/api/app" + payload = { + "appType": "agent", + "name": name, + "description": description, + "mcpService": mcp_service_ids, + } + + logger.info("创建智能体: %s (包含 %d 个 MCP 服务)", name, len(mcp_service_ids)) + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.post(url, json=payload) + response.raise_for_status() + + result = response.json() + if result.get("code") != HTTP_OK: + msg = f"创建智能体失败: {result.get('message', 'Unknown error')}" + logger.error(msg) + raise ApiError(msg) + + app_id = result["result"]["appId"] + logger.info("智能体创建成功: %s -> %s", name, app_id) + + except httpx.RequestError as e: + msg = f"创建智能体网络错误: {e}" + logger.exception(msg) + raise ApiError(msg) from e + + else: + return app_id + + async def publish_agent(self, app_id: str) -> None: + """发布智能体""" + url = f"{self.base_url}/api/app/{app_id}" + + logger.info("发布智能体: %s", app_id) + async with httpx.AsyncClient(timeout=self.timeout) as client: + try: + response = await client.post(url) + response.raise_for_status() + + result = response.json() + if result.get("code") != HTTP_OK: + msg = f"发布智能体失败: {result.get('message', 'Unknown error')}" + logger.error(msg) + raise ApiError(msg) + + logger.info("智能体发布成功: %s", app_id) + + except httpx.RequestError as e: + msg = f"发布智能体网络错误: {e}" + logger.exception(msg) + raise ApiError(msg) from e + + +class AgentManager: + """智能体管理器""" + + def __init__(self, server_ip: str = "127.0.0.1", server_port: int = 8002) -> None: + """初始化智能体管理器""" + self.api_client = ApiClient(server_ip, server_port) + self.config_manager = ConfigManager() + self.mcp_config_dir = Path("/usr/lib/openeuler-intelligence/scripts/5-resource/mcp_config") + + async def initialize_agents( + self, + progress_callback: Callable[[DeploymentState], None] | None = None, + ) -> bool: + """初始化智能体""" + state = DeploymentState() + self._report_progress(state, "🚀 开始初始化智能体...", progress_callback) + + try: + # 加载配置 + configs = await self._load_mcp_configs(state, progress_callback) + if not configs: + return False + + # 处理 MCP 服务 + service_ids = await self._process_all_mcp_services(configs, state, progress_callback) + if not service_ids: + self._report_progress(state, "❌ 所有 MCP 服务处理失败", progress_callback) + return False + + # 创建智能体 + app_id = await self._create_and_publish_agent(service_ids, state, progress_callback) + + self._report_progress( + state, + f"🎉 智能体初始化完成! App ID: {app_id}", + progress_callback, + ) + logger.info("智能体初始化成功完成,App ID: %s", app_id) + + except Exception as e: + error_msg = f"智能体初始化失败: {e}" + self._report_progress(state, f"❌ {error_msg}", progress_callback) + logger.exception(error_msg) + return False + + else: + return True + + def _report_progress( + self, + state: DeploymentState, + message: str, + callback: Callable[[DeploymentState], None] | None = None, + ) -> None: + """报告进度""" + state.add_log(message) + if callback: + callback(state) + + async def _load_mcp_configs( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> list[tuple[Path, McpConfig]]: + """加载 MCP 配置""" + self._report_progress(state, "📋 加载 MCP 配置文件...", callback) + + config_loader = McpConfigLoader(self.mcp_config_dir) + configs = config_loader.load_all_configs() + + if not configs: + self._report_progress(state, "⚠️ 未找到 MCP 配置文件", callback) + return [] + + self._report_progress(state, f"✅ 成功加载 {len(configs)} 个 MCP 配置", callback) + return configs + + async def _process_all_mcp_services( + self, + configs: list[tuple[Path, McpConfig]], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> list[str]: + """处理所有 MCP 服务""" + service_ids = [] + for _config_path, config in configs: + self._report_progress(state, f"🔧 处理 MCP 服务: {config.name}", callback) + + service_id = await self._process_mcp_service(config, state, callback) + if service_id: + service_ids.append(service_id) + else: + self._report_progress(state, f"❌ MCP 服务 {config.name} 处理失败", callback) + + return service_ids + + async def _create_and_publish_agent( + self, + service_ids: list[str], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str: + """创建并发布智能体""" + self._report_progress( + state, + f"🤖 创建智能体 (包含 {len(service_ids)} 个 MCP 服务)", + callback, + ) + + app_id = await self.api_client.create_agent( + "OS 智能助手", + "OS 智能助手", + service_ids, + ) + + await self.api_client.publish_agent(app_id) + + self._report_progress(state, "💾 保存智能体配置...", callback) + self.config_manager.set_default_app(app_id) + + return app_id + + async def _register_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str: + """注册 MCP 服务""" + self._report_progress(state, f" 📝 注册 {config.name}...", callback) + return await self.api_client.register_mcp_service(config) + + async def _install_and_wait_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装并等待 MCP 服务完成""" + self._report_progress(state, f" ⬇️ 安装 {config_name} (ID: {service_id})...", callback) + await self.api_client.install_mcp_service(service_id) + + self._report_progress(state, f" ⏳ 等待 {config_name} 安装完成...", callback) + if not await self.api_client.wait_for_installation(service_id): + self._report_progress(state, f" ❌ {config_name} 安装超时", callback) + return False + + return True + + async def _activate_mcp_service( + self, + service_id: str, + config_name: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> None: + """激活 MCP 服务""" + self._report_progress(state, f" 🔄 激活 {config_name}...", callback) + await self.api_client.activate_mcp_service(service_id) + self._report_progress(state, f" ✅ {config_name} 处理完成", callback) + + async def _process_mcp_service( + self, + config: McpConfig, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> str | None: + """处理单个 MCP 服务""" + try: + # 注册服务 + service_id = await self._register_mcp_service(config, state, callback) + + # 安装并等待完成 + if not await self._install_and_wait_mcp_service(service_id, config.name, state, callback): + return None + + # 激活服务 + await self._activate_mcp_service(service_id, config.name, state, callback) + + except (ApiError, httpx.RequestError, Exception) as e: + self._report_progress(state, f" ❌ {config.name} 处理失败: {e}", callback) + logger.exception("MCP 服务 %s 处理失败", config.name) + return None + + else: + return service_id diff --git a/src/app/deployment/service.py b/src/app/deployment/service.py index a4736bb..3a68ce1 100644 --- a/src/app/deployment/service.py +++ b/src/app/deployment/service.py @@ -17,6 +17,7 @@ import toml from log.manager import get_logger +from .agent import AgentManager from .models import DeploymentConfig, DeploymentState if TYPE_CHECKING: @@ -38,7 +39,7 @@ class DeploymentResourceManager: CONFIG_TEMPLATE = RESOURCE_PATH / "config.toml" # 系统配置文件路径 - INSTALL_MODEL_FILE = Path("/etc/euler_Intelligence_install_model") + INSTALL_MODE_FILE = Path("/etc/euler_Intelligence_install_mode") @classmethod def check_installer_available(cls) -> bool: @@ -372,7 +373,7 @@ class DeploymentService: self._generate_config_files, self._setup_deploy_mode, self._run_init_config_script, - self._run_agent_init_script, + self._run_agent_init, ] # 依次执行每个步骤 @@ -645,7 +646,7 @@ class DeploymentService: cmd = [ "sudo", "tee", - str(self.resource_manager.INSTALL_MODEL_FILE), + str(self.resource_manager.INSTALL_MODE_FILE), ] process = await asyncio.create_subprocess_exec( @@ -797,128 +798,15 @@ class DeploymentService: logger.warning("读取进程输出时发生错误: %s", e) break - async def _run_agent_init_script( + async def _run_agent_init( self, config: DeploymentConfig, progress_callback: Callable[[DeploymentState], None] | None, ) -> bool: """运行 Agent 初始化脚本""" - temp_state = DeploymentState() - temp_state.current_step = 7 - temp_state.total_steps = 7 - temp_state.current_step_name = "Agent 初始化" - temp_state.add_log("开始 Agent 初始化...") - - if progress_callback: - progress_callback(temp_state) - - # 检查脚本文件存在性 - agent_script_path = Path("/usr/lib/openeuler-intelligence/scripts/4-other-script/agent_manager.py") - if not agent_script_path.exists(): - temp_state.add_log("错误: Agent 管理脚本不存在") - logger.error("Agent 脚本不存在: %s", agent_script_path) - return False - - # 检查配置文件存在性 - config_file_path = self.resource_manager.CONFIG_TEMPLATE - if not config_file_path.exists(): - temp_state.add_log("错误: 配置文件不存在") - logger.error("配置文件不存在: %s", config_file_path) - return False - - try: - return await self._execute_agent_init_command( - agent_script_path, - config_file_path, - temp_state, - progress_callback, - ) - except Exception as e: - temp_state.add_log(f"❌ Agent 初始化异常: {e!s}") - logger.exception("Agent 初始化过程中发生异常") - if progress_callback: - progress_callback(temp_state) - return False - - async def _execute_agent_init_command( - self, - agent_script_path: Path, - config_file_path: Path, - temp_state: DeploymentState, - progress_callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """执行 Agent 初始化命令""" - # 构建 Agent 初始化命令(默认执行 comb 操作) - # 根据 agent_manager.py 的参数定义:operator 和 config_path 是位置参数 - # 由于脚本需要访问系统级目录,使用 sudo 权限执行 - cmd = [ - "sudo", - "python3", - str(agent_script_path), - "comb", - str(config_file_path), - ] - - temp_state.add_log(f"执行命令: {' '.join(cmd)}") - logger.info("执行 Agent 初始化命令: %s", " ".join(cmd)) - - # 执行脚本 - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - cwd=agent_script_path.parent, - ) + # 使用配置中的服务器 IP 和默认端口 + server_ip = config.server_ip or "127.0.0.1" + agent_manager = AgentManager(server_ip=server_ip, server_port=8002) - # 读取并处理输出 - output_lines = await self._process_agent_init_output(process, temp_state, progress_callback) - - # 等待进程结束并处理结果 - return_code = await process.wait() - return self._handle_agent_init_result(return_code, output_lines, temp_state, progress_callback) - - async def _process_agent_init_output( - self, - process: asyncio.subprocess.Process, - temp_state: DeploymentState, - progress_callback: Callable[[DeploymentState], None] | None, - ) -> list[str]: - """处理 Agent 初始化输出""" - output_lines = [] - if process.stdout: - while True: - line = await process.stdout.readline() - if not line: - break - - decoded_line = line.decode("utf-8", errors="ignore").strip() - if decoded_line: - output_lines.append(decoded_line) - temp_state.add_log(f"Agent初始化: {decoded_line}") - if progress_callback: - progress_callback(temp_state) - - return output_lines - - def _handle_agent_init_result( - self, - return_code: int, - output_lines: list[str], - temp_state: DeploymentState, - progress_callback: Callable[[DeploymentState], None] | None, - ) -> bool: - """处理 Agent 初始化结果""" - if return_code == 0: - temp_state.add_log("✅ Agent 初始化完成") - logger.info("Agent 初始化成功完成") - if progress_callback: - progress_callback(temp_state) - return True - - temp_state.add_log(f"❌ Agent 初始化失败 (退出码: {return_code})") - logger.error("Agent 初始化失败,退出码: %s", return_code) - for line in output_lines[-5:]: # 显示最后5行错误信息 - temp_state.add_log(f"错误详情: {line}") - if progress_callback: - progress_callback(temp_state) - return False + # 初始化 Agent 和 MCP 服务 + return await agent_manager.initialize_agents(progress_callback) diff --git a/src/config/manager.py b/src/config/manager.py index f093ea8..2c7264d 100644 --- a/src/config/manager.py +++ b/src/config/manager.py @@ -83,6 +83,15 @@ class ConfigManager: self.data.log_level = level self._save_settings() + def get_default_app(self) -> str: + """获取当前默认智能体 ID""" + return self.data.eulerintelli.default_app + + def set_default_app(self, app_id: str) -> None: + """更新默认智能体 ID 并保存""" + self.data.eulerintelli.default_app = app_id + self._save_settings() + def _load_settings(self) -> None: """从文件载入设置""" if self.config_path.exists(): diff --git a/src/config/model.py b/src/config/model.py index 3c3b766..ae67373 100644 --- a/src/config/model.py +++ b/src/config/model.py @@ -56,6 +56,7 @@ class HermesConfig: base_url: str = field(default="https://www.eulerintelli.com") api_key: str = field(default="your-eulerintelli-api-key") + default_app: str = field(default="") @classmethod def from_dict(cls, d: dict) -> "HermesConfig": @@ -63,11 +64,16 @@ class HermesConfig: return cls( base_url=d.get("base_url", cls.base_url), api_key=d.get("api_key", cls.api_key), + default_app=d.get("default_app", cls.default_app), ) def to_dict(self) -> dict: """转换为字典""" - return {"base_url": self.base_url, "api_key": self.api_key} + return { + "base_url": self.base_url, + "api_key": self.api_key, + "default_app": self.default_app, + } @dataclass -- Gitee From bbf0b067626f6e852c474989259b1fbe9638aff1 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Tue, 19 Aug 2025 10:03:05 +0800 Subject: [PATCH 2/5] =?UTF-8?q?feat(deploy):=20=E6=B7=BB=E5=8A=A0=20framew?= =?UTF-8?q?ork=20=E6=9C=8D=E5=8A=A1=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E4=BB=A5=E5=A2=9E=E5=BC=BA=20Agent=20?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/service.py | 127 +++++++++++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 2 deletions(-) diff --git a/src/app/deployment/service.py b/src/app/deployment/service.py index 3a68ce1..302e4ec 100644 --- a/src/app/deployment/service.py +++ b/src/app/deployment/service.py @@ -13,6 +13,7 @@ import re from pathlib import Path from typing import TYPE_CHECKING +import httpx import toml from log.manager import get_logger @@ -798,15 +799,137 @@ class DeploymentService: logger.warning("读取进程输出时发生错误: %s", e) break + async def _check_framework_service_health( + self, + server_ip: str, + server_port: int, + progress_callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查 framework 服务健康状态""" + # 1. 检查 systemctl framework 服务状态 + if not await self._check_systemctl_service_status(progress_callback): + return False + + # 2. 检查 HTTP API 接口连通性 + return await self._check_framework_api_health(server_ip, server_port, progress_callback) + + async def _check_systemctl_service_status( + self, + progress_callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查 systemctl framework 服务状态,每2秒检查一次,5次后超时""" + max_attempts = 5 + check_interval = 2.0 # 2秒 + + for attempt in range(1, max_attempts + 1): + self.state.add_log(f"检查 framework 服务状态 ({attempt}/{max_attempts})...") + + if progress_callback: + progress_callback(self.state) + + try: + # 使用 systemctl is-active 检查服务状态 + process = await asyncio.create_subprocess_exec( + "systemctl", + "is-active", + "framework", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + status = stdout.decode("utf-8").strip() + + if process.returncode == 0 and status == "active": + self.state.add_log("✓ Framework 服务状态正常") + return True + + self.state.add_log(f"Framework 服务状态: {status}") + + if attempt < max_attempts: + self.state.add_log(f"等待 {check_interval} 秒后重试...") + await asyncio.sleep(check_interval) + + except (OSError, TimeoutError) as e: + self.state.add_log(f"检查服务状态时发生错误: {e}") + if attempt < max_attempts: + await asyncio.sleep(check_interval) + + self.state.add_log("✗ Framework 服务状态检查超时失败") + return False + + async def _check_framework_api_health( + self, + server_ip: str, + server_port: int, + progress_callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查 framework API 健康状态,每10秒检查一次,5分钟后超时""" + max_attempts = 30 + check_interval = 10.0 # 10秒 + api_url = f"http://{server_ip}:{server_port}/api/user" + http_ok = 200 # HTTP OK 状态码 + + async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client: + self.state.add_log("等待 openEuler Intelligence 服务就绪") + for attempt in range(1, max_attempts + 1): + if progress_callback: + progress_callback(self.state) + + try: + response = await client.get(api_url) + + if response.status_code == http_ok: + self.state.add_log("✓ openEuler Intelligence 服务已就绪") + return True + + except httpx.ConnectError: + self.state.add_log(f"无法连接到 {api_url}") + except httpx.TimeoutException: + self.state.add_log(f"连接 {api_url} 超时") + except (httpx.RequestError, OSError) as e: + self.state.add_log(f"API 连通性检查时发生错误: {e}") + + if attempt < max_attempts: + await asyncio.sleep(check_interval) + + self.state.add_log("✗ openEuler Intelligence API 服务检查超时失败") + return False + async def _run_agent_init( self, config: DeploymentConfig, progress_callback: Callable[[DeploymentState], None] | None, ) -> bool: """运行 Agent 初始化脚本""" + self.state.current_step = 7 + self.state.current_step_name = "初始化 Agent 服务" + self.state.add_log("正在检查 framework 服务连通性...") + + if progress_callback: + progress_callback(self.state) + # 使用配置中的服务器 IP 和默认端口 server_ip = config.server_ip or "127.0.0.1" - agent_manager = AgentManager(server_ip=server_ip, server_port=8002) + server_port = 8002 + + # 检查 framework 服务连通性 + if not await self._check_framework_service_health(server_ip, server_port, progress_callback): + self.state.add_log("✗ Framework 服务连通性检查失败") + return False + + self.state.add_log("✓ Framework 服务连通性检查通过,开始初始化 Agent...") + + if progress_callback: + progress_callback(self.state) # 初始化 Agent 和 MCP 服务 - return await agent_manager.initialize_agents(progress_callback) + agent_manager = AgentManager(server_ip=server_ip, server_port=server_port) + success = await agent_manager.initialize_agents(progress_callback) + + if success: + self.state.add_log("✓ Agent 初始化完成") + else: + self.state.add_log("✗ Agent 初始化失败") + + return success -- Gitee From 7b415f35f0ed0ea382cbfb0851b235f493fde699 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Tue, 19 Aug 2025 10:57:17 +0800 Subject: [PATCH 3/5] =?UTF-8?q?feat(deploy):=20=E6=9B=B4=E6=96=B0=20MCP=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=8A=B6=E6=80=81=E6=A3=80=E6=9F=A5=E5=92=8C?= =?UTF-8?q?=E5=AE=89=E8=A3=85=E7=AD=89=E5=BE=85=E9=80=BB=E8=BE=91;=20?= =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E9=83=A8=E7=BD=B2=E6=A8=A1=E5=BC=8F=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E6=80=BB=E6=AD=A5=E9=AA=A4=E5=92=8C=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E6=8F=90=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 72 ++++++++++++++++++++++++++++------- src/app/deployment/service.py | 30 ++++++++++----- src/app/deployment/ui.py | 6 +-- 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index ee59911..71544de 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -177,8 +177,18 @@ class ApiClient: logger.exception(msg) raise ApiError(msg) from e - async def check_mcp_service_status(self, service_id: str) -> bool: - """检查 MCP 服务状态""" + async def check_mcp_service_status(self, service_id: str) -> str | None: + """ + 检查 MCP 服务状态 + + 返回值: + - "ready": 安装完成且成功 + - "failed": 安装失败 + - "cancelled": 安装取消 + - "init": 初始化中 + - "installing": 安装中 + - None: 网络错误或无法获取状态 + """ url = f"{self.base_url}/api/mcp/{service_id}" async with httpx.AsyncClient(timeout=self.timeout) as client: @@ -187,11 +197,24 @@ class ApiClient: response.raise_for_status() result = response.json() - # 假设安装成功的标志是 code == 200 - return result.get("code") == HTTP_OK + # 检查 API 调用是否成功 + if result.get("code") != HTTP_OK: + logger.warning("获取 MCP 服务状态失败: %s", result.get("message", "Unknown error")) + return None - except httpx.RequestError: - return False + # 获取服务状态 + service_result = result.get("result", {}) + status = service_result.get("status") + + if status in ("ready", "failed", "cancelled", "init", "installing"): + return status + + logger.warning("未知的 MCP 服务状态: %s", status) + + except httpx.RequestError as e: + logger.debug("检查 MCP 服务状态网络错误: %s", e) + + return None async def wait_for_installation( self, @@ -199,19 +222,42 @@ class ApiClient: max_wait_time: int = 300, check_interval: int = 10, ) -> bool: - """等待 MCP 服务安装完成""" + """ + 等待 MCP 服务安装完成 + + 只要接口能打通、后端返回的状态没有明确成功或失败或取消,就会一直等下去。 + 只有在明确失败或取消时才返回 False。 + """ logger.info("等待 MCP 服务安装完成: %s", service_id) - for attempt in range(max_wait_time // check_interval): - if await self.check_mcp_service_status(service_id): + attempt = 0 + while True: + status = await self.check_mcp_service_status(service_id) + + if status == "ready": logger.info("MCP 服务安装完成: %s", service_id) return True - logger.debug("MCP 服务 %s 安装中... (第 %d 次检查)", service_id, attempt + 1) - await asyncio.sleep(check_interval) + if status in ("failed", "cancelled"): + logger.error("MCP 服务安装失败或被取消: %s (状态: %s)", service_id, status) + return False - logger.error("MCP 服务安装超时: %s", service_id) - return False + if status in ("init", "installing"): + logger.debug("MCP 服务 %s %s中... (第 %d 次检查)", service_id, + "初始化" if status == "init" else "安装", attempt + 1) + elif status is None: + logger.debug("MCP 服务 %s 状态检查失败,继续等待... (第 %d 次检查)", service_id, attempt + 1) + else: + logger.debug("MCP 服务 %s 状态未知: %s,继续等待... (第 %d 次检查)", service_id, status, attempt + 1) + + # 只有在超过最大等待时间时才超时返回,但仅在没有明确失败的情况下 + attempt += 1 + if attempt * check_interval >= max_wait_time: + # 这里不返回 False,而是继续等待,因为要求只要接口能打通就一直等 + logger.warning("MCP 服务安装等待超时: %s (已等待 %d 秒,但将继续尝试)", + service_id, max_wait_time) + + await asyncio.sleep(check_interval) async def activate_mcp_service(self, service_id: str) -> None: """激活 MCP 服务""" diff --git a/src/app/deployment/service.py b/src/app/deployment/service.py index 302e4ec..49ffe27 100644 --- a/src/app/deployment/service.py +++ b/src/app/deployment/service.py @@ -272,7 +272,8 @@ class DeploymentService: # 重置状态 self.state.reset() self.state.is_running = True - self.state.total_steps = 7 + # 根据部署模式设置总步数:轻量模式7步,全量模式6步 + self.state.total_steps = 7 if config.deployment_mode == "light" else 6 # 执行部署步骤 success = await self._execute_deployment_steps(config, progress_callback) @@ -366,7 +367,7 @@ class DeploymentService: progress_callback: Callable[[DeploymentState], None] | None, ) -> bool: """执行所有部署步骤""" - # 定义部署步骤 + # 定义基础部署步骤 steps = [ self._check_environment, self._run_env_check_script, @@ -374,14 +375,23 @@ class DeploymentService: self._generate_config_files, self._setup_deploy_mode, self._run_init_config_script, - self._run_agent_init, ] + # 轻量化部署模式下才自动执行 Agent 初始化 + if config.deployment_mode == "light": + steps.append(self._run_agent_init) + # 依次执行每个步骤 for step in steps: if not await step(config, progress_callback): return False + # 如果是全量部署模式,提示用户到网页端完成 Agent 配置 + if config.deployment_mode == "full": + self.state.add_log("✓ 基础服务部署完成") + self.state.add_log("请访问网页管理界面完成 Agent 服务配置") + self.state.add_log(f"管理界面地址: http://{config.server_ip}:8080") + return True async def _execute_install_command( @@ -870,9 +880,11 @@ class DeploymentService: api_url = f"http://{server_ip}:{server_port}/api/user" http_ok = 200 # HTTP OK 状态码 + self.state.add_log("等待 openEuler Intelligence 服务就绪") + async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client: - self.state.add_log("等待 openEuler Intelligence 服务就绪") for attempt in range(1, max_attempts + 1): + logger.debug("第 %d 次检查 openEuler Intelligence 服务状态...", attempt) if progress_callback: progress_callback(self.state) @@ -884,7 +896,7 @@ class DeploymentService: return True except httpx.ConnectError: - self.state.add_log(f"无法连接到 {api_url}") + pass except httpx.TimeoutException: self.state.add_log(f"连接 {api_url} 超时") except (httpx.RequestError, OSError) as e: @@ -904,7 +916,7 @@ class DeploymentService: """运行 Agent 初始化脚本""" self.state.current_step = 7 self.state.current_step_name = "初始化 Agent 服务" - self.state.add_log("正在检查 framework 服务连通性...") + self.state.add_log("正在检查 openEuler Intelligence 后端服务状态...") if progress_callback: progress_callback(self.state) @@ -913,12 +925,12 @@ class DeploymentService: server_ip = config.server_ip or "127.0.0.1" server_port = 8002 - # 检查 framework 服务连通性 + # 检查 openEuler Intelligence 后端服务状态 if not await self._check_framework_service_health(server_ip, server_port, progress_callback): - self.state.add_log("✗ Framework 服务连通性检查失败") + self.state.add_log("✗ openEuler Intelligence 服务检查失败") return False - self.state.add_log("✓ Framework 服务连通性检查通过,开始初始化 Agent...") + self.state.add_log("✓ openEuler Intelligence 服务检查通过,开始初始化 Agent...") if progress_callback: progress_callback(self.state) diff --git a/src/app/deployment/ui.py b/src/app/deployment/ui.py index 00d5ea4..ed0da57 100644 --- a/src/app/deployment/ui.py +++ b/src/app/deployment/ui.py @@ -405,7 +405,7 @@ class DeploymentConfigScreen(ModalScreen[bool]): # 描述区域,显示当前部署模式的详细说明 with Horizontal(classes="form-row"): yield Static( - "轻量部署:仅部署框架服务。", + "轻量部署:仅部署框架服务,自动初始化 Agent。", id="deployment_mode_desc", classes="form-input", ) @@ -533,10 +533,10 @@ class DeploymentConfigScreen(ModalScreen[bool]): # 如果当前为轻量,则切换到全量 if btn.label and "轻量" in str(btn.label): btn.label = "全量部署" - desc.update("全量部署:部署框架服务、Web 界面和 RAG 组件。") + desc.update("全量部署:部署框架服务、Web 界面和 RAG 组件,需手动配置 Agent。") else: btn.label = "轻量部署" - desc.update("轻量部署:仅部署框架服务。") + desc.update("轻量部署:仅部署框架服务,自动初始化 Agent。") except (AttributeError, ValueError): # 查询失败或属性错误时忽略 return -- Gitee From 270ad289b1b42af364fb8a233bdbec19822e4920 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Tue, 19 Aug 2025 11:30:26 +0800 Subject: [PATCH 4/5] =?UTF-8?q?fix(agent):=20=E4=BF=AE=E5=A4=8D=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=20MCP=20=E9=85=8D=E7=BD=AE=20JSON=20=E7=9A=84?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- src/app/deployment/agent.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 71544de..3a0e8e3 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -112,11 +112,11 @@ class McpConfigLoader: config_data = json.load(f) return McpConfig( - name=name, - description=name, - overview=name, - config=config_data, - mcp_type="sse", # 默认类型 + name=config_data.get("name", name), + description=config_data.get("description", name), + overview=config_data.get("overview", name), + config=config_data.get("config", {}), + mcp_type=config_data.get("mcpType", "sse"), ) @@ -352,7 +352,22 @@ class AgentManager: """初始化智能体管理器""" self.api_client = ApiClient(server_ip, server_port) self.config_manager = ConfigManager() - self.mcp_config_dir = Path("/usr/lib/openeuler-intelligence/scripts/5-resource/mcp_config") + + # 尝试多个可能的配置路径 + possible_paths = [ + Path("/usr/lib/openeuler-intelligence/scripts/5-resource/mcp_config"), # 生产环境 + Path("scripts/deploy/5-resource/mcp_config"), # 开发环境(相对路径) + Path(__file__).parent.parent.parent.parent / "scripts/deploy/5-resource/mcp_config", # 开发环境(绝对路径) + ] + + self.mcp_config_dir = possible_paths[0] # 默认使用生产环境路径 + for path in possible_paths: + if path.exists(): + self.mcp_config_dir = path + logger.info("使用 MCP 配置目录: %s", path) + break + else: + logger.warning("未找到 MCP 配置目录,使用默认路径: %s", self.mcp_config_dir) async def initialize_agents( self, -- Gitee From 0230b24fb76679dc961859842da1df55a260dbd0 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Tue, 19 Aug 2025 14:35:28 +0800 Subject: [PATCH 5/5] =?UTF-8?q?mcp:=20=E7=A7=BB=E9=99=A4=E6=9A=82=E4=B8=8D?= =?UTF-8?q?=E5=8F=AF=E7=94=A8=E7=9A=84=20sysTrace=20MCP=20=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hongyu Shi --- .../mcp_config/sysTrace_mcp/config.json | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 scripts/deploy/5-resource/mcp_config/sysTrace_mcp/config.json diff --git a/scripts/deploy/5-resource/mcp_config/sysTrace_mcp/config.json b/scripts/deploy/5-resource/mcp_config/sysTrace_mcp/config.json deleted file mode 100644 index 6a7684e..0000000 --- a/scripts/deploy/5-resource/mcp_config/sysTrace_mcp/config.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "systrace_mcp_server", - "overview": "systrace 运维 mcp 服务", - "description": "systrace 运维 mcp 服务", - "mcpType": "sse", - "author": "root", - "config": { - "env": {}, - "autoApprove": [], - "disabled": false, - "auto_install": false, - "description": "", - "timeout": 60, - "url": "http://127.0.0.1:12145/sse" - } -} \ No newline at end of file -- Gitee