diff --git a/scripts/deploy/5-resource/mcp-servers.rpmlist b/scripts/deploy/5-resource/mcp-servers.rpmlist new file mode 100644 index 0000000000000000000000000000000000000000..cc3226ed92ff373d02e335fbf4ea63bef93fff1d --- /dev/null +++ b/scripts/deploy/5-resource/mcp-servers.rpmlist @@ -0,0 +1,2 @@ +mcp-servers-perf +mcp-servers-remote-shell diff --git a/scripts/deploy/5-resource/sysTrace.rpmlist b/scripts/deploy/5-resource/sysTrace.rpmlist new file mode 100644 index 0000000000000000000000000000000000000000..fee84624ac6d5263269bf7fe67ebb094437b8779 --- /dev/null +++ b/scripts/deploy/5-resource/sysTrace.rpmlist @@ -0,0 +1,2 @@ +sysTrace-failslow +sysTrace-mcpserver diff --git a/scripts/deploy/deploy.spec b/scripts/deploy/deploy.spec deleted file mode 100644 index a027d6bef39288d6c33383d68a652cca84b2f868..0000000000000000000000000000000000000000 --- a/scripts/deploy/deploy.spec +++ /dev/null @@ -1,39 +0,0 @@ -Name: openEuler-Intelligence-Installer -Version: 1.0 -Release: 2 -Summary: Deployment scripts package for openEuler 24.03 SP2 - -License: MIT -URL: https://gitee.com/openeuler/euler-copilot-shell.git -Source0: %{name}-%{version}.tar.gz - -BuildArch: noarch - -%description -This package contains deployment scripts and resources. - -%prep -%setup -q - -%install -# 创建安装目录 -mkdir -p %{buildroot}/usr/lib/openeuler-intelligence/{scripts,resources} - -# 复制脚本和资源 -install -m 755 deploy.sh %{buildroot}/usr/lib/openeuler-intelligence/scripts/deploy -cp -r 0-one-click-deploy 1-check-env 2-install-dependency 3-install-server 4-other-script 5-resource %{buildroot}/usr/lib/openeuler-intelligence/scripts/ -chmod -R +x %{buildroot}/usr/lib/openeuler-intelligence/scripts/ - -mkdir -p %{buildroot}/usr/bin -ln -sf /usr/lib/openeuler-intelligence/scripts/deploy %{buildroot}/usr/bin/openEuler-Intelligence-Installer - -%files -/usr/lib/openeuler-intelligence -/usr/bin/openEuler-Intelligence-Installer - -%changelog -* Wed Aug 20 2025 houxu 'houxu5@h-partners.com' - 1.0-2 -- update scripts path - -* Mon Jul 21 2025 houxu 'houxu5@h-partners.com' - 1.0-1 -- Initial package \ No newline at end of file diff --git a/scripts/tools/uninstaller.sh b/scripts/tools/uninstaller.sh index 17dc8942bb902f53e1f2f65d95d106d7df672666..ca08fa1d5ac71b52794243135f5be9b16712d4b0 100755 --- a/scripts/tools/uninstaller.sh +++ b/scripts/tools/uninstaller.sh @@ -12,8 +12,25 @@ fi set -e echo "Stopping services..." -systemctl stop framework || true -systemctl stop rag || true +# For each expected service, first check if the unit file exists, then stop if running and disable it. +for svc in framework rag; do + unit="${svc}.service" + # Check if the service unit exists on the system + if systemctl list-unit-files --type=service | awk '{print $1}' | grep -Fxq "$unit"; then + # If the service is active/running, stop it; otherwise just report + if systemctl is-active --quiet "$unit"; then + echo "Stopping $unit ..." + systemctl stop "$unit" || true + else + echo "$unit is not running." + fi + # Attempt to disable the unit (may already be disabled) + echo "Disabling $unit ..." + systemctl disable "$unit" || true + else + echo "$unit not found; skipping." + fi +done echo "Removing packages..." dnf remove -y openeuler-intelligence-* || true @@ -41,4 +58,21 @@ done echo "Removing configuration template..." rm -f /etc/openEuler-Intelligence/smart-shell-template.json +echo "Uninstalling built-in MCP servers ..." +# Check for running systrace-mcpserver services and stop/disable them if present. +services=$(systemctl list-units --type=service --state=running | awk '{print $1}' | grep -E '^systrace-mcpserver' || true) +if [ -n "$services" ]; then + for service in $services; do + echo "Stopping $service ..." + systemctl stop "$service" || true + echo "Disabling $service ..." + systemctl disable "$service" || true + done +else + echo "No running systrace-mcpserver services found." +fi + +dnf remove -y sysTrace-* || true +dnf remove -y mcp-servers-perf mcp-servers-remote-shell || true + echo "Uninstallation complete." diff --git a/src/app/css/styles.tcss b/src/app/css/styles.tcss index 10ab2eae5b90c4fadd3a20d4533ab3514ec0142c..fbe381f9cd2b84439fbf2a7d63a2d1d8cdae5cf8 100644 --- a/src/app/css/styles.tcss +++ b/src/app/css/styles.tcss @@ -90,6 +90,7 @@ Static { width: 20%; content-align: right middle; padding-right: 1; + margin-top: 1; } /* 设置值样式 */ @@ -105,8 +106,16 @@ Static { width: 40%; } +#save-btn, #cancel-btn { + width: auto; + min-width: 16; + height: 3; + margin: 0 1; +} + /* 操作按钮容器样式 */ #action-buttons { + align: center middle; dock: bottom; height: auto; margin-top: 1; @@ -254,7 +263,6 @@ Static { #input-container.mcp-mode { height: auto; min-height: 10; - max-height: 15; } /* 正常模式下的输入容器 */ diff --git a/src/app/deployment/agent.py b/src/app/deployment/agent.py index 9415838bf562f4ba560901778115b61193ca82d5..ad0e8983e31d16084087d303fb40efc6a4f6c780 100644 --- a/src/app/deployment/agent.py +++ b/src/app/deployment/agent.py @@ -23,7 +23,7 @@ import httpx from config.manager import ConfigManager from log.manager import get_logger -from .models import DeploymentState +from .models import AgentInitStatus, DeploymentState if TYPE_CHECKING: from collections.abc import Callable @@ -243,8 +243,12 @@ class ApiClient: return False if status in ("init", "installing"): - logger.debug("MCP 服务 %s %s中... (第 %d 次检查)", service_id, - "初始化" if status == "init" else "安装", attempt + 1) + logger.debug( + "MCP 服务 %s %s中... (第 %d 次检查)", + service_id, + "初始化" if status == "init" else "安装", + attempt + 1, + ) elif status is None: logger.debug("MCP 服务 %s 状态检查失败,继续等待... (第 %d 次检查)", service_id, attempt + 1) else: @@ -254,8 +258,7 @@ class ApiClient: attempt += 1 if attempt * check_interval >= max_wait_time: # 这里不返回 False,而是继续等待,因为要求只要接口能打通就一直等 - logger.warning("MCP 服务安装等待超时: %s (已等待 %d 秒,但将继续尝试)", - service_id, max_wait_time) + logger.warning("MCP 服务安装等待超时: %s (已等待 %d 秒,但将继续尝试)", service_id, max_wait_time) await asyncio.sleep(check_interval) @@ -353,60 +356,83 @@ class AgentManager: self.api_client = ApiClient(server_ip, server_port) self.config_manager = ConfigManager() - # 尝试多个可能的配置路径 - possible_paths = [ - Path("/usr/lib/openeuler-intelligence/scripts/5-resource/mcp_config"), # 生产环境 - Path("scripts/deploy/5-resource/mcp_config"), # 开发环境(相对路径) - Path(__file__).parent.parent.parent.parent / "scripts/deploy/5-resource/mcp_config", # 开发环境(绝对路径) + resource_paths = [ + Path("/usr/lib/openeuler-intelligence/scripts/5-resource"), # 生产环境 + Path("scripts/deploy/5-resource"), # 开发环境(相对路径) + Path(__file__).parent.parent.parent / "scripts/deploy/5-resource", # 开发环境(绝对路径) ] - self.mcp_config_dir = possible_paths[0] # 默认使用生产环境路径 - for path in possible_paths: - if path.exists(): - self.mcp_config_dir = path - logger.info("使用 MCP 配置目录: %s", path) - break - else: - logger.warning("未找到 MCP 配置目录,使用默认路径: %s", self.mcp_config_dir) + self.resource_dir = next((p for p in resource_paths if p.exists()), None) + if not self.resource_dir: + logger.error("[DeploymentHelper] 未找到有效的资源路径") + return + logger.info("[DeploymentHelper] 使用资源路径: %s", self.resource_dir) + + self.mcp_config_dir = self.resource_dir / "mcp_config" async def initialize_agents( self, progress_callback: Callable[[DeploymentState], None] | None = None, - ) -> bool: - """初始化智能体""" + ) -> AgentInitStatus: + """ + 初始化智能体 + + Returns: + AgentInitStatus: 初始化状态 (SUCCESS/SKIPPED/FAILED) + + """ state = DeploymentState() - self._report_progress(state, "🚀 开始初始化智能体...", progress_callback) + self._report_progress(state, "[bold blue]开始初始化智能体...[/bold blue]", progress_callback) try: + # 预处理:检查必要的 RPM 包可用性 + rpm_availability_result = await self._check_prerequisite_packages_availability(state, progress_callback) + if rpm_availability_result == AgentInitStatus.SKIPPED: + return AgentInitStatus.SKIPPED + + # 安装必要的 RPM 包 + if not await self._install_prerequisite_packages(state, progress_callback): + return AgentInitStatus.FAILED + # 加载配置 configs = await self._load_mcp_configs(state, progress_callback) if not configs: - return False + return AgentInitStatus.FAILED # 处理 MCP 服务 - service_ids = await self._process_all_mcp_services(configs, state, progress_callback) - if not service_ids: - self._report_progress(state, "❌ 所有 MCP 服务处理失败", progress_callback) - return False + os_service_ids, systrace_service_ids = await self._process_all_mcp_services( + configs, + state, + progress_callback, + ) + + if not os_service_ids and not systrace_service_ids: + self._report_progress(state, "[red]所有 MCP 服务处理失败[/red]", progress_callback) + return AgentInitStatus.FAILED # 创建智能体 - app_id = await self._create_and_publish_agent(service_ids, state, progress_callback) + default_app_id = await self._create_multiple_agents( + os_service_ids, + systrace_service_ids, + state, + progress_callback, + ) self._report_progress( state, - f"🎉 智能体初始化完成! App ID: {app_id}", + f"[bold green]智能体初始化完成! 默认 App ID: {default_app_id}[/bold green]", progress_callback, ) - logger.info("智能体初始化成功完成,App ID: %s", app_id) + logger.info("智能体初始化成功完成,默认 App ID: %s", default_app_id) - except Exception as e: - error_msg = f"智能体初始化失败: {e}" - self._report_progress(state, f"❌ {error_msg}", progress_callback) + except Exception: + error_msg = "智能体初始化失败" + self._report_progress(state, f"[red]{error_msg}[/red]", progress_callback) logger.exception(error_msg) - return False + return AgentInitStatus.FAILED else: - return True + return AgentInitStatus.SUCCESS def _report_progress( self, @@ -425,16 +451,16 @@ class AgentManager: callback: Callable[[DeploymentState], None] | None, ) -> list[tuple[Path, McpConfig]]: """加载 MCP 配置""" - self._report_progress(state, "📋 加载 MCP 配置文件...", callback) + self._report_progress(state, "[cyan]加载 MCP 配置文件...[/cyan]", callback) config_loader = McpConfigLoader(self.mcp_config_dir) configs = config_loader.load_all_configs() if not configs: - self._report_progress(state, "⚠️ 未找到 MCP 配置文件", callback) + self._report_progress(state, "[yellow]未找到 MCP 配置文件[/yellow]", callback) return [] - self._report_progress(state, f"✅ 成功加载 {len(configs)} 个 MCP 配置", callback) + self._report_progress(state, f"[green]成功加载 {len(configs)} 个 MCP 配置[/green]", callback) return configs async def _process_all_mcp_services( @@ -442,45 +468,82 @@ class AgentManager: configs: list[tuple[Path, McpConfig]], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, - ) -> list[str]: - """处理所有 MCP 服务""" - service_ids = [] - for _config_path, config in configs: - self._report_progress(state, f"🔧 处理 MCP 服务: {config.name}", callback) + ) -> tuple[list[str], list[str]]: + """ + 处理所有 MCP 服务 + + Returns: + tuple[list[str], list[str]]: (os_service_ids, systrace_service_ids) + + """ + os_service_ids = [] + systrace_service_ids = [] + + for config_path, config in configs: + self._report_progress(state, f"[magenta]处理 MCP 服务: {config.name}[/magenta]", callback) service_id = await self._process_mcp_service(config, state, callback) if service_id: - service_ids.append(service_id) + # 根据配置路径判断是否为 sysTrace 相关服务 + if "systrace" in config_path.parent.name.lower() or "systrace" in config.name.lower(): + systrace_service_ids.append(service_id) + else: + os_service_ids.append(service_id) else: - self._report_progress(state, f"❌ MCP 服务 {config.name} 处理失败", callback) + self._report_progress(state, f"[red]MCP 服务 {config.name} 处理失败[/red]", callback) - return service_ids + return os_service_ids, systrace_service_ids - async def _create_and_publish_agent( + async def _create_multiple_agents( self, - service_ids: list[str], + os_service_ids: list[str], + systrace_service_ids: list[str], state: DeploymentState, callback: Callable[[DeploymentState], None] | None, ) -> str: - """创建并发布智能体""" - self._report_progress( - state, - f"🤖 创建智能体 (包含 {len(service_ids)} 个 MCP 服务)", - callback, - ) + """创建多个智能体,返回默认智能体 ID""" + default_app_id = None - app_id = await self.api_client.create_agent( - "OS 智能助手", - "OS 智能助手", - service_ids, - ) + # 创建 OS 智能助手(如果有相应的服务) + if os_service_ids: + self._report_progress( + state, + f"[bold cyan]创建 OS 智能助手 (包含 {len(os_service_ids)} 个 MCP 服务)[/bold cyan]", + callback, + ) - await self.api_client.publish_agent(app_id) + os_app_id = await self.api_client.create_agent( + "OS 智能助手", + "openEuler 智能助手", + os_service_ids, + ) + await self.api_client.publish_agent(os_app_id) + + self._report_progress(state, "[green]OS 智能助手创建成功[/green]", callback) + default_app_id = os_app_id # OS 智能助手作为默认应用 - self._report_progress(state, "💾 保存智能体配置...", callback) - self.config_manager.set_default_app(app_id) + # 创建慢卡检测智能助手(如果有相应的服务) + if systrace_service_ids: + self._report_progress( + state, + f"[bold magenta]创建慢卡检测智能助手 (包含 {len(systrace_service_ids)} 个 MCP 服务)[/bold magenta]", + callback, + ) + + systrace_app_id = await self.api_client.create_agent( + "慢卡检测智能助手", + "检测集群中的慢卡问题", + systrace_service_ids, + ) + await self.api_client.publish_agent(systrace_app_id) - return app_id + self._report_progress(state, "[green]慢卡检测智能助手创建成功[/green]", callback) + + if default_app_id: + self._report_progress(state, "[dim]保存默认智能体配置...[/dim]", callback) + self.config_manager.set_default_app(default_app_id) + + return default_app_id or "" async def _register_mcp_service( self, @@ -489,7 +552,7 @@ class AgentManager: callback: Callable[[DeploymentState], None] | None, ) -> str: """注册 MCP 服务""" - self._report_progress(state, f" 📝 注册 {config.name}...", callback) + self._report_progress(state, f" [blue]注册 {config.name}...[/blue]", callback) return await self.api_client.register_mcp_service(config) async def _install_and_wait_mcp_service( @@ -500,12 +563,12 @@ class AgentManager: callback: Callable[[DeploymentState], None] | None, ) -> bool: """安装并等待 MCP 服务完成""" - self._report_progress(state, f" ⬇️ 安装 {config_name} (ID: {service_id})...", callback) + self._report_progress(state, f" [cyan]安装 {config_name} (ID: {service_id})...[/cyan]", callback) await self.api_client.install_mcp_service(service_id) - self._report_progress(state, f" ⏳ 等待 {config_name} 安装完成...", callback) + self._report_progress(state, f" [dim]等待 {config_name} 安装完成...[/dim]", callback) if not await self.api_client.wait_for_installation(service_id): - self._report_progress(state, f" ❌ {config_name} 安装超时", callback) + self._report_progress(state, f" [red]{config_name} 安装超时[/red]", callback) return False return True @@ -518,9 +581,9 @@ class AgentManager: callback: Callable[[DeploymentState], None] | None, ) -> None: """激活 MCP 服务""" - self._report_progress(state, f" 🔄 激活 {config_name}...", callback) + self._report_progress(state, f" [yellow]激活 {config_name}...[/yellow]", callback) await self.api_client.activate_mcp_service(service_id) - self._report_progress(state, f" ✅ {config_name} 处理完成", callback) + self._report_progress(state, f" [green]{config_name} 处理完成[/green]", callback) async def _process_mcp_service( self, @@ -535,7 +598,7 @@ class AgentManager: if not valid: self._report_progress( state, - f" ❌ MCP 服务 {config.name} SSE Endpoint 验证失败", + f" [red]MCP 服务 {config.name} SSE Endpoint 验证失败[/red]", callback, ) return None @@ -551,7 +614,7 @@ class AgentManager: await self._activate_mcp_service(service_id, config.name, state, callback) except (ApiError, httpx.RequestError, Exception) as e: - self._report_progress(state, f" ❌ {config.name} 处理失败: {e}", callback) + self._report_progress(state, f" [red]{config.name} 处理失败: {e}[/red]", callback) logger.exception("MCP 服务 %s 处理失败", config.name) return None @@ -568,7 +631,7 @@ class AgentManager: url = config.config.get("url") or "" self._report_progress( state, - f"🔍 验证 SSE Endpoint: {config.name} -> {url}", + f"[magenta]验证 SSE Endpoint: {config.name} -> {url}[/magenta]", callback, ) try: @@ -580,7 +643,7 @@ class AgentManager: if response.status_code != HTTP_OK: self._report_progress( state, - f" ❌ {config.name} URL 响应码非 200: {response.status_code}", + f" [red]{config.name} URL 响应码非 200: {response.status_code}[/red]", callback, ) return False @@ -588,13 +651,357 @@ class AgentManager: if "text/event-stream" not in content_type: self._report_progress( state, - f" ❌ {config.name} Content-Type 非 SSE: {content_type}", + f" [red]{config.name} Content-Type 非 SSE: {content_type}[/red]", callback, ) return False - self._report_progress(state, f" ✅ {config.name} SSE Endpoint 验证通过", callback) + self._report_progress(state, f" [green]{config.name} SSE Endpoint 验证通过[/green]", callback) return True except Exception as e: - self._report_progress(state, f" ❌ {config.name} SSE 验证失败: {e}", callback) + self._report_progress(state, f" [red]{config.name} SSE 验证失败:[/red] {e}", callback) logger.exception("验证 SSE Endpoint 失败: %s", url) return False + + async def _install_prerequisite_packages( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装必要的 RPM 包(已知包可用的情况下)""" + try: + # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) + systrace_exists = self._check_systrace_config(state, callback) + + # 安装包(此时已知包是可用的) + if systrace_exists: + # 安装 sysTrace.rpmlist 中的包 + if not await self._install_rpm_packages("sysTrace.rpmlist", state, callback): + return False + + # 设置 systrace-mcpserver 服务开机启动并立即启动 + if not await self._setup_systrace_service(state, callback): + return False + + # 安装 mcp-servers.rpmlist 中的包 + return await self._install_rpm_packages("mcp-servers.rpmlist", state, callback) + + except Exception: + error_msg = "安装必要包失败" + self._report_progress(state, f"[red]{error_msg}[/red]", callback) + logger.exception(error_msg) + return False + + async def _check_rpm_packages_availability( + self, + rpm_list_files: list[str], + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查 RPM 包是否在 yum 源中可用""" + self._report_progress(state, "[cyan]检查 RPM 包在 yum 源中的可用性...[/cyan]", callback) + + if not self.resource_dir: + self._report_progress( + state, + "[red]资源目录未找到,无法检查 RPM 包可用性[/red]", + callback, + ) + logger.error("资源目录未找到,无法检查 RPM 包可用性") + return False + + all_packages = [] + + # 收集所有需要检查的包 + for rpm_list_file in rpm_list_files: + rpm_list_path = self.resource_dir / rpm_list_file + + if not rpm_list_path.exists(): + self._report_progress( + state, + f"[yellow]RPM 列表文件不存在: {rpm_list_file},跳过检查[/yellow]", + callback, + ) + logger.warning("RPM 列表文件不存在: %s", rpm_list_path) + continue + + try: + with rpm_list_path.open(encoding="utf-8") as f: + packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] + all_packages.extend(packages) + except Exception as e: + self._report_progress( + state, + f"[red]读取 RPM 列表文件失败:[/red] {rpm_list_file} - {e}", + callback, + ) + logger.exception("读取 RPM 列表文件失败: %s", rpm_list_path) + return False + + if not all_packages: + self._report_progress(state, "[dim]没有要检查的 RPM 包[/dim]", callback) + return True + + # 检查每个包的可用性 + unavailable_packages = [] + + for package in all_packages: + # 使用 dnf list available 检查包是否可用 + check_cmd = f"dnf list available {package}" + + try: + process = await asyncio.create_subprocess_shell( + check_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await process.communicate() + + if process.returncode != 0: + unavailable_packages.append(package) + logger.warning("RPM 包不可用: %s", package) + + except Exception as e: + self._report_progress( + state, + f" [red]检查包 {package} 失败:[/red] {e}", + callback, + ) + logger.exception("检查 RPM 包可用性失败: %s", package) + unavailable_packages.append(package) + + # 如果有不可用的包,返回 False + if unavailable_packages: + self._report_progress( + state, + f"[dim]以下 RPM 包不可用,跳过智能体初始化: {', '.join(unavailable_packages)}[/dim]", + callback, + ) + logger.error("发现不可用的 RPM 包,跳过智能体初始化: %s", unavailable_packages) + return False + + self._report_progress( + state, + "[green]所有 RPM 包均可用,继续智能体初始化[/green]", + callback, + ) + logger.info("所有 RPM 包均可用") + return True + + async def _check_prerequisite_packages_availability( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> AgentInitStatus: + """ + 检查必要的 RPM 包是否在 yum 源中可用 + + Returns: + AgentInitStatus: SUCCESS 表示所有包可用,SKIPPED 表示有包不可用应跳过 + + """ + try: + # 准备要检查的 RPM 列表文件 + rpm_files_to_check = ["mcp-servers.rpmlist"] + + # 检查是否存在以 "systrace" 开头的子目录(不区分大小写) + systrace_exists = self._check_systrace_config(state, callback) + if systrace_exists: + rpm_files_to_check.append("sysTrace.rpmlist") + + # 检查包可用性 + packages_available = await self._check_rpm_packages_availability(rpm_files_to_check, state, callback) + + if not packages_available: + self._report_progress( + state, + "[yellow]MCP Server 相关 RPM 包可用性检查失败,跳过智能体初始化,其他部署步骤将继续进行[/yellow]", + callback, + ) + return AgentInitStatus.SKIPPED + + except Exception: + error_msg = "检查 RPM 包可用性失败" + self._report_progress(state, f"[red]{error_msg}[/red]", callback) + logger.exception(error_msg) + return AgentInitStatus.SKIPPED # 检查失败也视为跳过,而不是整个部署失败 + + else: + return AgentInitStatus.SUCCESS + + def _check_systrace_config( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """检查是否存在以 systrace 开头的配置目录""" + self._report_progress(state, "[cyan]检查 sysTrace 配置...[/cyan]", callback) + + if not self.resource_dir or not self.mcp_config_dir: + self._report_progress(state, "[yellow]资源目录或 MCP 配置目录不存在[/yellow]", callback) + return False + + if not self.mcp_config_dir.exists(): + self._report_progress(state, "[yellow]MCP 配置目录不存在[/yellow]", callback) + return False + + for subdir in self.mcp_config_dir.iterdir(): + if subdir.is_dir() and subdir.name.lower().startswith("systrace"): + self._report_progress(state, f"[green]发现 sysTrace 配置: {subdir.name}[/green]", callback) + logger.info("发现 sysTrace 配置目录: %s", subdir.name) + return True + + self._report_progress(state, "[dim]未发现 sysTrace 配置[/dim]", callback) + return False + + async def _install_rpm_packages( + self, + rpm_list_file: str, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """安装指定 RPM 列表文件中的包""" + if not self.resource_dir: + self._report_progress( + state, + f"[red]资源目录未找到,无法安装 {rpm_list_file}[/red]", + callback, + ) + logger.error("资源目录未找到,无法安装 RPM 包: %s", rpm_list_file) + return False + + rpm_list_path = self.resource_dir / rpm_list_file + + if not rpm_list_path.exists(): + self._report_progress( + state, + f"[yellow]RPM 列表文件不存在: {rpm_list_file}[/yellow]", + callback, + ) + logger.warning("RPM 列表文件不存在: %s", rpm_list_path) + return True # 文件不存在不算失败,继续执行 + + self._report_progress(state, f"[cyan]安装 {rpm_list_file} 中的 RPM 包...[/cyan]", callback) + + try: + # 读取 RPM 包列表 + with rpm_list_path.open(encoding="utf-8") as f: + packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] + + if not packages: + self._report_progress(state, f"[dim]{rpm_list_file} 中没有要安装的包[/dim]", callback) + return True + + # 使用 dnf 安装包 + package_list = " ".join(packages) + install_cmd = f"sudo dnf install -y {package_list}" + + self._report_progress( + state, + f" [blue]执行安装命令: {install_cmd}[/blue]", + callback, + ) + logger.info("执行 RPM 包安装命令: %s", install_cmd) + + # 使用 asyncio.create_subprocess_shell 执行命令 + process = await asyncio.create_subprocess_shell( + install_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + stdout, _ = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" + + if process.returncode == 0: + self._report_progress( + state, + f" [green]{rpm_list_file} 中的包安装成功[/green]", + callback, + ) + logger.info("RPM 包安装成功: %s", package_list) + else: + self._report_progress( + state, + f" [red]{rpm_list_file} 中的包安装失败 (返回码: {process.returncode})[/red]", + callback, + ) + logger.error("RPM 包安装失败: %s, 输出: %s", package_list, output) + return False + + except Exception: + error_msg = f"安装 {rpm_list_file} 失败" + self._report_progress(state, f" [red]{error_msg}[/red]", callback) + logger.exception(error_msg) + return False + + return True + + async def _setup_systrace_service( + self, + state: DeploymentState, + callback: Callable[[DeploymentState], None] | None, + ) -> bool: + """设置 systrace-mcpserver 服务""" + service_name = "systrace-mcpserver" + self._report_progress(state, f"[magenta]设置 {service_name} 服务...[/magenta]", callback) + + try: + # 启用服务开机启动 + enable_cmd = f"sudo systemctl enable {service_name}" + self._report_progress(state, f" [cyan]设置开机启动: {enable_cmd}[/cyan]", callback) + + process = await asyncio.create_subprocess_shell( + enable_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + stdout, _ = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" + + if process.returncode != 0: + self._report_progress( + state, + f" [red]设置 {service_name} 开机启动失败: {output}[/red]", + callback, + ) + logger.error("设置服务开机启动失败: %s, 输出: %s", service_name, output) + return False + + # 启动服务 + start_cmd = f"sudo systemctl start {service_name}" + self._report_progress(state, f" [blue]启动服务: {start_cmd}[/blue]", callback) + + process = await asyncio.create_subprocess_shell( + start_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + stdout, _ = await process.communicate() + output = stdout.decode("utf-8") if stdout else "" + + if process.returncode == 0: + self._report_progress( + state, + f" [green]{service_name} 服务启动成功[/green]", + callback, + ) + logger.info("sysTrace 服务启动成功: %s", service_name) + else: + self._report_progress( + state, + f" [red]{service_name} 服务启动失败: {output}[/red]", + callback, + ) + logger.error("sysTrace 服务启动失败: %s, 输出: %s", service_name, output) + return False + + except Exception: + error_msg = f"设置 {service_name} 服务失败" + self._report_progress(state, f" [red]{error_msg}[/red]", callback) + logger.exception(error_msg) + return False + + return True diff --git a/src/app/deployment/models.py b/src/app/deployment/models.py index 33f34a7820115791dc8f5dd16a4c6bd9ff9450bd..273e05afccd7571992cf68391a413027480739de 100644 --- a/src/app/deployment/models.py +++ b/src/app/deployment/models.py @@ -6,13 +6,23 @@ from __future__ import annotations +import re from dataclasses import dataclass, field +from enum import Enum # 常量定义 MAX_TEMPERATURE = 2.0 MIN_TEMPERATURE = 0.0 +class AgentInitStatus(Enum): + """智能体初始化状态""" + + SUCCESS = "success" # 成功完成 + SKIPPED = "skipped" # 跳过(RPM包不可用) + FAILED = "failed" # 失败(其他错误) + + @dataclass class LLMConfig: """ @@ -237,11 +247,124 @@ class DeploymentState: """ 添加日志消息 + 避免输出重复内容,只有当新消息与最后一条消息不同时才添加 + Args: message: 日志消息 """ - self.output_log.append(message) + # 转换 ANSI 颜色标记为 Textual 富文本标记 + rich_message = self._convert_shell_colors_to_rich(message) + + # 如果日志为空,或者新消息与最后一条消息不同,则添加 + if not self.output_log or self.output_log[-1] != rich_message: + self.output_log.append(rich_message) + + def _convert_shell_colors_to_rich(self, text: str) -> str: + r""" + 将 Shell ANSI 颜色码转换为 Textual Rich 标记 + + 基于脚本中实际使用的颜色标记进行转换: + - COLOR_INFO='\033[34m' # 蓝色信息 -> [blue] + - COLOR_SUCCESS='\033[32m' # 绿色成功 -> [green] + - COLOR_ERROR='\033[31m' # 红色错误 -> [red] + - COLOR_WARNING='\033[33m' # 黄色警告 -> [yellow] + - COLOR_RESET='\033[0m' # 重置颜色 -> [/] + + 处理跨行颜色标记,确保 Rich 标记的完整性,避免 MarkupError。 + + Args: + text: 包含 ANSI 颜色码的文本 + + Returns: + 转换后的 Rich 标记文本 + + """ + # ANSI 颜色码到 Rich 标记的映射 + color_map = { + r"\033\[34m": "[blue]", # 蓝色信息 + r"\033\[32m": "[green]", # 绿色成功 + r"\033\[31m": "[red]", # 红色错误 + r"\033\[33m": "[yellow]", # 黄色警告 + r"\033\[0;32m": "[green]", # 绿色 (GREEN 变量) + r"\033\[0;33m": "[yellow]", # 黄色 (YELLOW 变量) + r"\033\[0;34m": "[blue]", # 蓝色 (BLUE 变量) + r"\033\[0m": "[/]", # 重置颜色 + } + + # 应用颜色转换 + result = text + for ansi_code, rich_markup in color_map.items(): + result = re.sub(ansi_code, rich_markup, result) + + # 检查是否存在未配对的Rich标记,避免MarkupError + return self._ensure_balanced_rich_tags(result) + + def _ensure_balanced_rich_tags(self, text: str) -> str: + """ + 确保 Rich 标记的平衡性,避免跨行导致的 MarkupError + + 处理以下情况: + 1. 只有开始标记没有结束标记:自动添加结束标记 + 2. 只有结束标记没有开始标记:移除孤立的结束标记 + 3. 嵌套不当的标记:进行修复 + + Args: + text: 包含 Rich 标记的文本 + + Returns: + 平衡的 Rich 标记文本 + + """ + close_pattern = r"\[/\]" + color_pattern = r"\[(blue|green|red|yellow)\]" + + # 找到所有开始标记和结束标记,包含结束位置 + open_matches = [ + {"pos": match.start(), "end": match.end(), "type": "open", "tag": match.group(1)} + for match in re.finditer(color_pattern, text) + ] + + close_matches = [ + {"pos": match.start(), "end": match.end(), "type": "close", "tag": "close"} + for match in re.finditer(close_pattern, text) + ] + + # 合并并按位置排序 + all_matches = open_matches + close_matches + all_matches.sort(key=lambda x: x["pos"]) + + # 使用栈来跟踪标记平衡 + open_stack = [] + balanced_text = "" + last_pos = 0 + + for match in all_matches: + # 添加匹配前的文本 + balanced_text += text[last_pos : match["pos"]] + + if match["type"] == "open": + # 开始标记:入栈并添加到结果 + open_stack.append(match["tag"]) + balanced_text += f"[{match['tag']}]" + elif match["type"] == "close": + if open_stack: + # 有匹配的开始标记:出栈并添加结束标记 + open_stack.pop() + balanced_text += "[/]" + # 如果没有匹配的开始标记,忽略这个结束标记(不添加到结果中) + + last_pos = match["end"] + + # 添加剩余的文本 + balanced_text += text[last_pos:] + + # 为未闭合的开始标记添加结束标记 + while open_stack: + balanced_text += "[/]" + open_stack.pop() + + return balanced_text def clear_log(self) -> None: """清空日志""" diff --git a/src/app/deployment/service.py b/src/app/deployment/service.py index cf5ef3de0e2b4ba1f2ec293b3a0a187d8b29a5c4..3c4b3f7839dc13150e68c5ff52979888b29c1970 100644 --- a/src/app/deployment/service.py +++ b/src/app/deployment/service.py @@ -20,7 +20,7 @@ from config.manager import ConfigManager from log.manager import get_logger from .agent import AgentManager -from .models import DeploymentConfig, DeploymentState +from .models import AgentInitStatus, DeploymentConfig, DeploymentState if TYPE_CHECKING: from collections.abc import AsyncGenerator, Callable @@ -947,14 +947,19 @@ class DeploymentService: # 初始化 Agent 和 MCP 服务 agent_manager = AgentManager(server_ip=server_ip, server_port=server_port) - success = await agent_manager.initialize_agents(progress_callback) + init_status = await agent_manager.initialize_agents(progress_callback) - if success: + if init_status == AgentInitStatus.SUCCESS: self.state.add_log("✓ Agent 初始化完成") - else: - self.state.add_log("✗ Agent 初始化失败") + return True + + if init_status == AgentInitStatus.SKIPPED: + self.state.add_log("⚠ Agent 初始化已跳过(RPM 包不可用),但部署将继续进行") + return True # 跳过不算失败,继续部署 - return success + # FAILED + self.state.add_log("✗ Agent 初始化失败") + return False async def _create_global_config_template(self, config: DeploymentConfig) -> None: """ diff --git a/src/app/deployment/ui.py b/src/app/deployment/ui.py index 05ee8dc493e41aa688e8e7e6c261011bedecee6e..7be537ecb5a4080ed5657ec30b2d034fec8a5968 100644 --- a/src/app/deployment/ui.py +++ b/src/app/deployment/ui.py @@ -10,6 +10,7 @@ import asyncio import contextlib from typing import TYPE_CHECKING +from rich.errors import MarkupError from textual import on from textual.containers import Container, Horizontal, Vertical from textual.screen import ModalScreen @@ -18,7 +19,6 @@ from textual.widgets import ( Header, Input, Label, - ProgressBar, RichLog, Static, TabbedContent, @@ -620,13 +620,10 @@ class DeploymentProgressScreen(ModalScreen[bool]): min-height: 6; } - #progress_bar { - margin: 0 1; - } - #step_label { min-height: 1; height: auto; + color: $primary; } .log-section { @@ -657,6 +654,7 @@ class DeploymentProgressScreen(ModalScreen[bool]): self.deployment_success = False self.deployment_errors: list[str] = [] self.deployment_progress_value = 0 + self.latest_log: str = "" def compose(self) -> ComposeResult: """组合界面组件""" @@ -665,7 +663,6 @@ class DeploymentProgressScreen(ModalScreen[bool]): with Vertical(classes="progress-section"): yield Static("部署进度:", id="progress_label") - yield ProgressBar(total=FULL_PROGRESS, show_eta=False, id="progress_bar") yield Static("准备开始部署...", id="step_label") with Container(classes="log-section"): @@ -737,9 +734,9 @@ class DeploymentProgressScreen(ModalScreen[bool]): self.deployment_success = False self.deployment_errors.clear() self.deployment_progress_value = 0 # 重置进度记录 + self.latest_log = "" # 重置进度 - self.query_one("#progress_bar", ProgressBar).update(progress=self.deployment_progress_value) self.query_one("#step_label", Static).update("") # 重置按钮状态 @@ -817,7 +814,6 @@ class DeploymentProgressScreen(ModalScreen[bool]): # 更新界面状态 if success: self.deployment_success = True - self.query_one("#progress_bar", ProgressBar).update(progress=FULL_PROGRESS) self.query_one("#step_label", Static).update("部署完成!") self.query_one("#deployment_log", RichLog).write( @@ -852,7 +848,6 @@ class DeploymentProgressScreen(ModalScreen[bool]): # 只有在进度实际前进或者是初始状态时才更新 if progress >= self.deployment_progress_value or self.deployment_progress_value == 0: self.deployment_progress_value = progress - self.query_one("#progress_bar", ProgressBar).update(progress=self.deployment_progress_value) # 更新步骤标签 step_text = f"步骤 {state.current_step}/{state.total_steps}: {state.current_step_name}" @@ -860,15 +855,19 @@ class DeploymentProgressScreen(ModalScreen[bool]): # 添加最新的日志条目 log_widget = self.query_one("#deployment_log", RichLog) - if state.output_log: + if state.output_log and self.latest_log != state.output_log[-1]: # 只显示最新的日志条目 - latest_log = state.output_log[-1] - if latest_log.startswith("✓"): - log_widget.write(f"[green]{latest_log}[/green]") - elif latest_log.startswith("✗"): - log_widget.write(f"[red]{latest_log}[/red]") - else: - log_widget.write(latest_log) + self.latest_log = state.output_log[-1] + try: + if self.latest_log.startswith("✓"): + log_widget.write(f"[green]{self.latest_log}[/green]") + elif self.latest_log.startswith("✗"): + log_widget.write(f"[red]{self.latest_log}[/red]") + else: + log_widget.write(self.latest_log) + except MarkupError: + # 忽略日志消息格式错误 + pass class ErrorMessageScreen(ModalScreen[None]): diff --git a/src/app/settings.py b/src/app/settings.py index 934af6739e129ff39924db82074bc60b5fdb1982..7e10792a443578430e818e9be9ec9658edebbe98 100644 --- a/src/app/settings.py +++ b/src/app/settings.py @@ -91,8 +91,8 @@ class SettingsScreen(Screen): Static("", id="spacer"), # 操作按钮 Horizontal( - Button("保存", id="save-btn", classes="settings-button"), - Button("取消", id="cancel-btn", classes="settings-button"), + Button("保存", id="save-btn", variant="primary"), + Button("取消", id="cancel-btn", variant="default"), id="action-buttons", classes="settings-option", ), diff --git a/src/app/tui.py b/src/app/tui.py index fcc7b12d08e69ee593e832f55e77b2db8fef322c..8f7c791b2ede3338a56745e201d70e498c611e5b 100644 --- a/src/app/tui.py +++ b/src/app/tui.py @@ -268,7 +268,7 @@ class IntelligentTerminal(App): yield FocusableContainer(id="output-container") with Container(id="input-container", classes="normal-mode"): yield CommandInput() - yield Footer() + yield Footer(show_command_palette=False) def action_settings(self) -> None: """打开设置页面""" diff --git a/tests/app/deployment/test_agent_manager.py b/tests/app/deployment/test_agent_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..7e4823a24545c65833dd6aab5034543e1dfde055 --- /dev/null +++ b/tests/app/deployment/test_agent_manager.py @@ -0,0 +1,86 @@ +""" +测试 AgentManager 的 RPM 包安装功能 + +这个脚本会测试: +1. 检查 systrace 配置目录是否存在 +2. 模拟 RPM 包安装过程 + +使用方法: source .venv/bin/activate && PYTHONPATH=src python tests/app/deployment/test_agent_manager.py +""" + +import asyncio +import sys +from pathlib import Path + +from app.deployment.agent import AgentManager +from app.deployment.models import DeploymentState + +# 添加项目路径到 Python 模块搜索路径 +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root / "src")) + + +def progress_callback(state: DeploymentState) -> None: + """打印进度信息""" + if state.output_log: + _output(f"[进度] {state.output_log[-1]}") + + +async def test_agent_manager() -> None: + """测试 AgentManager 的功能""" + _output("开始测试 AgentManager...") + + # 创建 AgentManager 实例 + manager = AgentManager() + + if not manager.resource_dir: + _output("❌ 资源目录未找到,测试终止") + return + + _output(f"✅ 找到资源目录: {manager.resource_dir}") + _output(f"✅ MCP 配置目录: {manager.mcp_config_dir}") + + # 测试检查 systrace 配置 + state = DeploymentState() + systrace_exists = manager._check_systrace_config(state, progress_callback) # noqa: SLF001 + _output(f"sysTrace 配置检查结果: {systrace_exists}") + + # 测试必要包安装功能(模拟) + _output("\n测试必要包安装功能(模拟)...") + result = await manager._install_prerequisite_packages(state, progress_callback) # noqa: SLF001 + _output(f"必要包安装结果: {result}") + + # 测试安装必要的包(仅模拟) + _output("\n测试 RPM 包安装功能(仅检查文件存在性)...") + + # 检查 sysTrace.rpmlist 文件 + systrace_rpm_file = manager.resource_dir / "sysTrace.rpmlist" + if systrace_rpm_file.exists(): + _output(f"✅ 找到 sysTrace.rpmlist: {systrace_rpm_file}") + with systrace_rpm_file.open() as f: + packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] + _output(f" 需要安装的包: {packages}") + else: + _output(f"⚠️ sysTrace.rpmlist 文件不存在: {systrace_rpm_file}") + + # 检查 mcp-servers.rpmlist 文件 + mcp_rpm_file = manager.resource_dir / "mcp-servers.rpmlist" + if mcp_rpm_file.exists(): + _output(f"✅ 找到 mcp-servers.rpmlist: {mcp_rpm_file}") + with mcp_rpm_file.open() as f: + packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] + _output(f" 需要安装的包: {packages}") + else: + _output(f"⚠️ mcp-servers.rpmlist 文件不存在: {mcp_rpm_file}") + + _output("\n✅ 测试完成") + + +def _output(message: str = "") -> None: + """输出消息到标准输出""" + sys.stdout.write(f"{message}\n") + sys.stdout.flush() + + +if __name__ == "__main__": + asyncio.run(test_agent_manager()) diff --git a/tests/app/deployment/test_agent_skip.py b/tests/app/deployment/test_agent_skip.py new file mode 100644 index 0000000000000000000000000000000000000000..659fb4322029772d7d0341c025e0039c93508eee --- /dev/null +++ b/tests/app/deployment/test_agent_skip.py @@ -0,0 +1,93 @@ +""" +测试智能体初始化跳过功能 + +验证当 RPM 包不可用时,智能体初始化会被跳过但部署继续进行。 +使用方法: source .venv/bin/activate && PYTHONPATH=src python tests/app/deployment/test_agent_skip.py +""" + +import asyncio +import sys +import traceback +from pathlib import Path + +from app.deployment.agent import AgentManager +from app.deployment.models import AgentInitStatus, DeploymentState + +# 添加 src 目录到 Python 路径 +src_path = Path(__file__).parent / "src" +sys.path.insert(0, str(src_path)) + + +class TestProgress: + """测试用的进度回调类""" + + def __call__(self, state: DeploymentState) -> None: + """显示进度信息""" + if state.output_log: + # 获取最新的日志条目 + latest_log = state.output_log[-1] + _output(f"[测试] {latest_log}") + + +async def test_agent_init_skip() -> bool: + """测试智能体初始化跳过功能""" + _output("=== 测试智能体初始化跳过功能 ===") + + # 创建 AgentManager 实例 + agent_manager = AgentManager() + + if not agent_manager.resource_dir: + _output("❌ 未找到资源目录,测试失败") + return False + + # 创建测试状态和回调 + callback = TestProgress() + + _output(f"\n资源目录: {agent_manager.resource_dir}") + + # 执行智能体初始化 + _output("\n开始测试智能体初始化...") + init_status = await agent_manager.initialize_agents(callback) + + _output("\n=== 测试结果 ===") + _output(f"初始化状态: {init_status}") + + if init_status == AgentInitStatus.SUCCESS: + _output("✅ 智能体初始化成功") + return True + if init_status == AgentInitStatus.SKIPPED: + _output("⚠️ 智能体初始化已跳过(这是预期结果)") + _output("✅ 测试通过:部署应该继续进行并显示成功") + return True + # FAILED + _output("❌ 智能体初始化失败") + return False + + +async def main() -> None: + """主函数""" + try: + result = await test_agent_init_skip() + + _output("\n=== 总结 ===") + if result: + _output("✅ 测试通过:智能体初始化跳过逻辑正常工作") + _output("📋 部署流程应该显示:'⚠ Agent 初始化已跳过(RPM 包不可用),但部署将继续进行'") + _output("🎯 最终部署结果应该显示为成功") + else: + _output("❌ 测试失败:智能体初始化跳过逻辑有问题") + + except Exception as e: # noqa: BLE001 + _output(f"❌ 测试执行失败: {e}") + + traceback.print_exc() + + +def _output(message: str = "") -> None: + """输出消息到标准输出""" + sys.stdout.write(f"{message}\n") + sys.stdout.flush() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/app/deployment/test_rpm_availability.py b/tests/app/deployment/test_rpm_availability.py new file mode 100644 index 0000000000000000000000000000000000000000..e7e127d548e73a5ab64595bdb0cdc5ddd3e39b55 --- /dev/null +++ b/tests/app/deployment/test_rpm_availability.py @@ -0,0 +1,100 @@ +""" +测试 RPM 包可用性检查功能 + +这个脚本用于测试 agent.py 中添加的 _check_rpm_packages_availability 方法。 +使用方法: source .venv/bin/activate && PYTHONPATH=src python tests/app/deployment/test_rpm_availability.py +""" + +import asyncio +import sys +import traceback +from pathlib import Path + +from app.deployment.agent import AgentManager +from app.deployment.models import DeploymentState + +# 添加 src 目录到 Python 路径 +src_path = Path(__file__).parent / "src" +sys.path.insert(0, str(src_path)) + + +class TestProgress: + """测试用的进度回调类""" + + def __call__(self, state: DeploymentState) -> None: + """显示进度信息""" + if state.output_log: + # 获取最新的日志条目 + latest_log = state.output_log[-1] + _output(f"[测试] {latest_log}") + + +async def test_rpm_availability() -> bool: + """测试 RPM 包可用性检查""" + _output("=== 测试 RPM 包可用性检查 ===") + + # 创建 AgentManager 实例 + agent_manager = AgentManager() + + if not agent_manager.resource_dir: + _output("❌ 未找到资源目录,测试失败") + return False + + # 创建测试状态和回调 + state = DeploymentState() + callback = TestProgress() + + # 准备测试的 RPM 列表文件 + rpm_files = ["mcp-servers.rpmlist", "sysTrace.rpmlist"] + + # 检查文件是否存在 + _output(f"\n资源目录: {agent_manager.resource_dir}") + for rpm_file in rpm_files: + file_path = agent_manager.resource_dir / rpm_file + if file_path.exists(): + _output(f"✅ 找到文件: {rpm_file}") + # 显示文件内容 + with file_path.open(encoding="utf-8") as f: + packages = [line.strip() for line in f if line.strip() and not line.startswith("#")] + _output(f" 包含的包: {packages}") + else: + _output(f"⚠️ 文件不存在: {rpm_file}") + + # 执行可用性检查 + _output("\n开始检查 RPM 包可用性...") + result = await agent_manager._check_rpm_packages_availability(rpm_files, state, callback) # noqa: SLF001 + + _output("\n=== 测试结果 ===") + if result: + _output("✅ 所有 RPM 包均可用") + else: + _output("❌ 存在不可用的 RPM 包") + + return result + + +async def main() -> None: + """主函数""" + try: + result = await test_rpm_availability() + + _output("\n=== 总结 ===") + if result: + _output("✅ 测试通过:可以继续智能体初始化") + else: + _output("❌ 测试失败:应该跳过智能体初始化") + + except Exception as e: # noqa: BLE001 + _output(f"❌ 测试执行失败: {e}") + + traceback.print_exc() + + +def _output(message: str = "") -> None: + """输出消息到标准输出""" + sys.stdout.write(f"{message}\n") + sys.stdout.flush() + + +if __name__ == "__main__": + asyncio.run(main())