diff --git a/mcp_center/requiremenets.txt b/mcp_center/requirements.txt similarity index 100% rename from mcp_center/requiremenets.txt rename to mcp_center/requirements.txt diff --git a/mcp_center/run.sh b/mcp_center/run.sh index dabae182d59a802237a7f38f999230b035ae68ce..3794695284d0c04347a374ea5fea0422f033cf89 100755 --- a/mcp_center/run.sh +++ b/mcp_center/run.sh @@ -1,19 +1,28 @@ #!/bin/bash SERVICE_DIR="/usr/lib/euler-copilot-framework/mcp_center/service" - +SYSTEMD_TARGET_DIR="/etc/systemd/system" /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/run.sh /usr/lib/euler-copilot-framework/mcp_center/servers/rag/run.sh +systemctl daemon-reload + for service_file in "$SERVICE_DIR"/*.service; do + # 只保留「文件存在」的核心判断,其他验证全删 if [ -f "$service_file" ]; then service_name=$(basename "$service_file" .service) + dest_service="$SYSTEMD_TARGET_DIR/$service_name.service" + echo "正在载入service: $dest_service" + # 直接复制(-f 强制覆盖已存在的文件,-a 保留权限) + cp -af "$service_file" "$dest_service" + + # 原有的启用和启动命令 systemctl enable "$service_name" systemctl start "$service_name" fi done -pip install -r /usr/lib/euler-copilot-framework/mcp_center/requiremenets.txt -i https://pypi.tuna.tsinghua.edu.cn/simple +pip install -r /usr/lib/euler-copilot-framework/mcp_center/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple SCRIPT_PATH=$(cd $(dirname $0) && pwd) echo "当前脚本所在目录: $SCRIPT_PATH" diff --git a/mcp_center/servers/oe_cli_mcp_server/client/client.py b/mcp_center/servers/oe_cli_mcp_server/client/client.py index 3aa4b6c47640d9126afdd6635f9b055bac14d8f6..7d3f784cc38f9da715b648a53e309ad69b1c1d86 100644 --- a/mcp_center/servers/oe_cli_mcp_server/client/client.py +++ b/mcp_center/servers/oe_cli_mcp_server/client/client.py @@ -226,14 +226,29 @@ async def main() -> None: print(result) print("\n" + "="*60) - print("10. pkg_tool - 仅更新系统安全补丁(替代check-update)") + print("10. pkg_tool - 安装 nginx 包 + 验证安装结果") print("="*60) - # 工具无check-update枚举,替换为update-sec(安全更新) - result = await client.call_tool("pkg_tool", { - "action": "update-sec", - "yes": True + + # 步骤1:安装 nginx 包(双系统兼容,自动适配 apt/dnf) + print("正在安装 nginx 包...") + install_result = await client.call_tool("pkg_tool", { + "action": "install", # 安装动作(双系统兼容) + "pkg_name": "nginx", # 要安装的包名 + "yes": True # 自动确认安装(避免交互) }) - print(result) + print("安装执行结果:") + print(install_result) + + # 步骤2:验证安装结果(用 list 方法过滤 nginx 相关包) + print("\n" + "-"*40) + print("验证:查询已安装的 nginx 相关包") + print("-"*40) + verify_result = await client.call_tool("pkg_tool", { + "action": "list", # 列出已安装包 + "filter_key": "nginx" # 过滤关键词(只显示 nginx 相关) + }) + print("验证结果:") + print(verify_result) print("\n" + "="*60) print("11. pkg_tool - 清理 yum/dnf 包缓存(all类型)") @@ -281,7 +296,7 @@ async def main() -> None: # 工具无tree枚举,替换为restart实用场景 result = await client.call_tool("proc_tool", { "proc_actions": ["restart"], - "service_name": "ssh" # Ubuntu中sshd服务名为ssh + "service_name": "sshd" # openEuler中sshd服务名为ssh }) print(result) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py index 9e2c98c72a432870fdf4940fb3ca7638a82eae0e..ce5e1aa050162b770cbfc7d5e1ef72c6455cdc3a 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py @@ -4,7 +4,6 @@ import shutil from enum import Enum from typing import Dict, List from pydantic import Field - from config.public.base_config_loader import BaseConfig, LanguageEnum # 初始化日志 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py index aa627d95f6e165cc6bfe73711c708dd24d47e518..9d1d1c1317a533608f569deef85aa291b15c2f92 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py @@ -1,6 +1,6 @@ from typing import Dict, Optional from pydantic import Field -from config.public.base_config_loader import LanguageEnum, BaseConfig +from config.public.base_config_loader import LanguageEnum from servers.oe_cli_mcp_server.mcp_tools.base_tools.file_tool.base import ( init_result_dict, FileManager, diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/base.py new file mode 100644 index 0000000000000000000000000000000000000000..12a5afed4fe8b42a81d165180feccbb92e868ba6 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/base.py @@ -0,0 +1,273 @@ +import logging +import os +import subprocess +import re +from enum import Enum +from typing import Dict, List, Optional +from pydantic import Field +from config.public.base_config_loader import BaseConfig, LanguageEnum + +# 初始化日志(保留基础配置) +logger = logging.getLogger("pkg_tool") +logger.setLevel(logging.INFO) +lang = BaseConfig().get_config().public_config.language + +# ========== 枚举类定义(不变) ========== +class PkgActionEnum(str, Enum): + LIST = "list" # 列出已安装包 + INFO = "info" # 查询包详情 + INSTALL = "install" # 在线安装 + LOCAL_INSTALL = "local-install" # 离线RPM安装 + UPDATE = "update" # 更新包(单个/所有) + UPDATE_SEC = "update-sec" # 仅更新安全补丁 + REMOVE = "remove" # 卸载包 + CLEAN = "clean" # 清理缓存 + +class PkgCacheTypeEnum(str, Enum): + ALL = "all" # 清理所有缓存 + PACKAGES = "packages" # 清理包缓存 + METADATA = "metadata" # 清理元数据缓存 + +# ========== 通用工具函数(精简) ========== +def get_language() -> bool: + """获取语言配置:True=中文,False=英文""" + return BaseConfig().get_config().public_config.language == LanguageEnum.ZH + +def init_result_dict( + target_host: str = "127.0.0.1", + result_type: str = "list", + include_pkg_name: bool = True +) -> Dict: + """初始化返回结果字典""" + result = { + "success": False, + "message": "", + "result": [] if result_type == "list" else "", + "target": target_host, + "pkg_name": "" + } + return result + +def parse_pkg_action(action_str: str) -> PkgActionEnum: + """解析字符串为PkgActionEnum""" + try: + return PkgActionEnum(action_str.strip().lower()) + except ValueError: + valid_values = [e.value for e in PkgActionEnum] + raise ValueError(f"无效的操作类型,可选枚举值:{','.join(valid_values)}") + +def parse_cache_type(cache_type_str: str) -> PkgCacheTypeEnum: + """解析字符串为PkgCacheTypeEnum""" + try: + return PkgCacheTypeEnum(cache_type_str.strip().lower()) + except ValueError: + valid_values = [e.value for e in PkgCacheTypeEnum] + raise ValueError(f"无效的缓存类型,可选枚举值:{','.join(valid_values)}") + +def safe_split_log(log): + # 兼容:字符串→splitlines(),其他类型(字典/None)→转为列表 + return log.splitlines() if isinstance(log, str) else [str(log) if log else "无执行日志"] + +# ========== 核心命令工具(极简实现,解决命令不存在问题) ========== +def get_cmd_path(cmd: str) -> Optional[str]: + """获取命令绝对路径(仅遍历OpenEuler常见路径)""" + common_paths = ["/usr/bin", "/bin", "/usr/sbin", "/sbin"] + for path in common_paths: + cmd_path = os.path.join(path, cmd) + if os.path.exists(cmd_path) and os.access(cmd_path, os.X_OK): + return cmd_path + logger.warning(f"命令不存在:{cmd}") + return None + +# ========== 包管理核心类(极简修复,易读易维护) ========== +class PackageManager: + """OpenEuler软件包管理类(精简命令调用,解决系统调用异常)""" + def __init__(self, lang: LanguageEnum = LanguageEnum.ZH): + self.is_zh = lang + # 提前获取命令绝对路径(避免重复查找) + self.dnf_path = get_cmd_path("dnf") or get_cmd_path("yum") # 兼容yum + self.rpm_path = get_cmd_path("rpm") + + def _msg(self, zh: str, en: str) -> str: + """简洁多语言提示""" + return zh if self.is_zh else en + + def _run_cmd(self, cmd: List[str]) -> Optional[str]: + """执行命令(确保返回字符串或None,不返回字典)""" + if not cmd: + logger.error(self._msg("命令不能为空", "Command cannot be empty")) + return None + + # 替换命令为绝对路径 + cmd_name = cmd[0] + if cmd_name == "dnf" and self.dnf_path: + cmd[0] = self.dnf_path + elif cmd_name == "rpm" and self.rpm_path: + cmd[0] = self.rpm_path + + try: + logger.info(f"执行命令:{' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + timeout=30 + ) + # 确保返回的是字符串(命令输出) + return result.stdout.strip() or "操作执行成功" + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + err_msg = e.stderr.strip() or str(e) + logger.error(f"命令执行失败:{err_msg}") + return f"命令执行失败:{err_msg}" + except Exception as e: + logger.error(f"系统调用异常:{str(e)}") + return f"系统调用异常:{str(e)}" + # ---------- 核心功能(仅修复命令调用,不改动原有解析逻辑) ---------- + def list(self, filter_key: Optional[str] = Field(None, description="包名过滤关键词(可选)")) -> List[Dict]: + """列出已安装软件包""" + if not self.rpm_path: + return [{"error": self._msg("未找到rpm命令,无法列出包", "rpm command not found")}] + + cmd = [self.rpm_path, "-qa", "--queryformat", "%{NAME}\t%{VERSION}\t%{RELEASE}\t%{ARCH}\n"] + output = self._run_cmd(cmd) + if not output: + return [{"error": self._msg("列出包失败", "Failed to list packages")}] + + pkg_list = [] + for line in output.splitlines(): + if not line.strip(): + continue + name, version, release, arch = line.split("\t", 3) + if filter_key and filter_key.lower() not in name.lower(): + continue + pkg_list.append({ + "name": name, + "version": f"{version}-{release}", + "arch": arch, + "full_name": f"{name}-{version}-{release}.{arch}" + }) + return pkg_list + + def info(self, pkg_name: str = Field(..., description="包名(必填)")) -> Dict: + """查询软件包详情""" + if not pkg_name.strip(): + return {"error": self._msg("包名不能为空", "Package name cannot be empty")} + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法查询包详情", "dnf/yum command not found")} + + cmd = [self.dnf_path, "info", pkg_name.strip()] + output = self._run_cmd(cmd) + if not output: + return {"error": self._msg(f"查询包{pkg_name}详情失败", f"Failed to get info for {pkg_name}")} + + # 原有解析逻辑不变 + info_patterns = { + "name": r"Name\s*:\s*(.+)", + "version": r"Version\s*:\s*(.+)", + "release": r"Release\s*:\s*(.+)", + "arch": r"Architecture\s*:\s*(.+)", + "installed_size": r"Installed Size\s*:\s*(.+)", + "repo": r"From Repository\s*:\s*(.+)", + "summary": r"Summary\s*:\s*(.+)", + "license": r"License\s*:\s*(.+)", + "url": r"URL\s*:\s*(.+)" + } + pkg_info = {} + for key, pattern in info_patterns.items(): + match = re.search(pattern, output, re.IGNORECASE) + if match: + pkg_info[key] = match.group(1).strip() + + return pkg_info if pkg_info else {"error": self._msg(f"未找到包{pkg_name}的信息", f"No info found for {pkg_name}")} + + def install(self, + pkg_name: str = Field(..., description="包名(必填)"), + yes: bool = Field(True, description="自动确认(默认True)")) -> Dict: + """在线安装软件包""" + if not pkg_name.strip(): + return {"error": self._msg("包名不能为空", "Package name cannot be empty")} + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法安装包", "dnf/yum command not found")} + + cmd = [self.dnf_path, "install", pkg_name.strip()] + if yes: + cmd.append("-y") + output = self._run_cmd(cmd) + return {"message": output} if output else {"error": self._msg(f"安装包{pkg_name}失败", f"Failed to install {pkg_name}")} + + def local_install(self, + rpm_path: str = Field(..., description="RPM文件路径(必填)"), + yes: bool = Field(True, description="自动确认(默认True)")) -> Dict: + """离线安装RPM包""" + if not rpm_path.strip(): + return {"error": self._msg("RPM路径不能为空", "RPM path cannot be empty")} + if not os.path.exists(rpm_path.strip()): + return {"error": self._msg(f"RPM文件不存在:{rpm_path}", f"RPM file not found: {rpm_path}")} + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法安装RPM", "dnf/yum command not found")} + + cmd = [self.dnf_path, "localinstall", rpm_path.strip()] + if yes: + cmd.append("-y") + output = self._run_cmd(cmd) + return {"message": output} if output else {"error": self._msg("安装RPM失败", "Failed to install RPM")} + + def update(self, + pkg_name: Optional[str] = Field(None, description="包名(为空则更新所有)"), + yes: bool = Field(True, description="自动确认(默认True)")) -> Dict: + """更新软件包""" + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法更新包", "dnf/yum command not found")} + + cmd = [self.dnf_path, "update"] + if pkg_name: + cmd.append(pkg_name.strip()) + if yes: + cmd.append("-y") + output = self._run_cmd(cmd) + return {"message": output} if output else {"error": self._msg("更新包失败", "Failed to update package")} + + def update_sec(self, + yes: bool = Field(True, description="自动确认(默认True)")) -> Dict: + """仅更新安全补丁""" + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法更新安全补丁", "dnf/yum command not found")} + + cmd = [self.dnf_path, "update", "--security"] + if yes: + cmd.append("-y") + output = self._run_cmd(cmd) + return {"message": output} if output else {"error": self._msg("更新安全补丁失败", "Failed to update security patches")} + + def remove(self, + pkg_name: str = Field(..., description="包名(必填)"), + yes: bool = Field(True, description="自动确认(默认True)")) -> Dict: + """卸载软件包""" + if not pkg_name.strip(): + return {"error": self._msg("包名不能为空", "Package name cannot be empty")} + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法卸载包", "dnf/yum command not found")} + + cmd = [self.dnf_path, "remove", pkg_name.strip()] + if yes: + cmd.append("-y") + output = self._run_cmd(cmd) + return {"message": output} if output else {"error": self._msg(f"卸载包{pkg_name}失败", f"Failed to remove {pkg_name}")} + + def clean(self, + cache_type: PkgCacheTypeEnum = Field(PkgCacheTypeEnum.ALL, description="缓存类型(默认all)")) -> Dict: + """清理缓存(修复结果处理逻辑)""" + if not self.dnf_path: + return {"error": self._msg("未找到dnf/yum命令,无法清理缓存", "dnf/yum command not found")} + + cmd = [self.dnf_path, "clean", cache_type.value] + output = self._run_cmd(cmd) # 此时 output 是字符串或None + + # 直接返回结果,不调用 splitlines()(或只在 output 是字符串时调用) + if output and "失败" not in output: + return {"message": self._msg(f"缓存清理成功(类型:{cache_type.value})", f"Cache cleaned successfully (type: {cache_type.value})")} + else: + return {"error": self._msg(f"缓存清理失败:{output or '未知错误'}", f"Failed to clean cache: {output or 'Unknown error'}")} + diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/config.json new file mode 100644 index 0000000000000000000000000000000000000000..3b15b022708312343aa2c557db0ca7c6e676bcff --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/config.json @@ -0,0 +1,8 @@ +{ + "tools": { + "pkg_tool": { + "zh": "【OpenEuler软件包管理工具】\n功能:支持OpenEuler系统软件包的查、装、更、卸、缓存清理(基于dnf/rpm命令,安全调用)\n\n【核心提示】\n1. 操作类型(action)必须从以下枚举值中选择:list/info/install/local-install/update/update-sec/remove/clean,不可传入其他值;\n2. 缓存类型(cache_type)必须从以下枚举值中选择:all/packages/metadata,clean操作默认值为all;\n3. 不同操作类型(action)对应不同必填参数,未满足则执行失败,请严格遵守:\n - list(列出已安装包):可选传入filter_key(包名过滤关键词),无必填参数;\n - info(查询包详情)/install(在线安装)/update(更新包)/remove(卸载包):必须传入pkg_name(包名);\n - local-install(离线安装):必须传入rpm_path(RPM文件绝对路径);\n - update-sec(仅更新安全补丁)/clean(清理缓存):无额外必填参数;\n4. 非必填参数默认值:yes(自动确认)=True,lang(语言)=ZH;\n5. 路径格式要求:rpm_path需传入绝对路径(如\"/tmp/nginx-1.24.0-1.el9.x86_64.rpm\"),避免相对路径导致的找不到文件问题;\n6. 权限说明:安装/更新/卸载/清理缓存操作需root权限,否则会执行失败。\n\n【枚举类定义(必须遵守)】\n- PkgActionEnum(操作类型枚举):list / info / install / local-install / update / update-sec / remove / clean\n- PkgCacheTypeEnum(缓存类型枚举):all / packages / metadata\n\n【参数详情】\n- action:操作类型(必填,枚举值见上方)\n- pkg_name:包名(info/install/update/remove操作必填)\n- filter_key:包名过滤关键词(list操作可选,模糊匹配)\n- rpm_path:RPM文件路径(local-install操作必填,绝对路径)\n- cache_type:缓存类型(clean操作可选,枚举值见上方,默认all)\n- yes:自动确认操作(install/update/remove/clean默认True,无需手动输入y)\n\n【返回值说明】\n- success:执行结果(True=成功,False=失败)\n- message:执行信息/错误提示(多语言)\n- result:操作结果(list/info返回结构化列表,install/update等返回命令日志列表)\n- pkg_name:操作的包名(无包名操作返回空字符串)\n- target:执行目标(固定为127.0.0.1,本地执行)", + "en": "【OpenEuler Package Management Tool】\nFunction: Supports query/install/update/remove/cache clean for OpenEuler packages (based on dnf/rpm commands, safe call)\n\n【Core Guidelines】\n1. Operation type (action) must be selected from the following enum values: list/info/install/local-install/update/update-sec/remove/clean, other values are not allowed;\n2. Cache type (cache_type) must be selected from the following enum values: all/packages/metadata, default value for clean is all;\n3. Different action types correspond to different required parameters, execution will fail if not met, please strictly follow:\n - list (list installed packages): Optionally pass filter_key (package name filter), no required parameters;\n - info (query package details)/install (online install)/update (update package)/remove (uninstall package): Must pass pkg_name (package name);\n - local-install (offline RPM install): Must pass rpm_path (absolute path of RPM file);\n - update-sec (update security patches only)/clean (clean cache): No additional required parameters;\n4. Default values for optional parameters: yes (auto confirm)=True, lang (language)=ZH;\n5. Path format requirement: rpm_path must be absolute path (e.g.\"/tmp/nginx-1.24.0-1.el9.x86_64.rpm\"), avoid file not found due to relative path;\n6. Permission note: Install/update/remove/clean operations require root privileges, otherwise execution will fail.\n\n【Enum Class Definition (Must Follow)】\n- PkgActionEnum (Operation Type Enum): list / info / install / local-install / update / update-sec / remove / clean\n- PkgCacheTypeEnum (Cache Type Enum): all / packages / metadata\n\n【Parameter Details】\n- action: Operation type (required, enum values see above)\n- pkg_name: Package name (required for info/install/update/remove)\n- filter_key: Package name filter (optional for list, fuzzy match)\n- rpm_path: RPM file path (required for local-install, absolute path)\n- cache_type: Cache type (optional for clean, enum values see above, default all)\n- yes: Auto confirm operation (default True for install/update/remove/clean, no need to input y manually)\n \n【Return Value Explanation】\n- success: Execution result (True=success, False=failure)\n- message: Execution info/error prompt (multilingual)\n- result: Operation result (structured list for list/info, command log list for install/update etc.)\n- pkg_name: Operated package name (empty string for operations without package name)\n- target: Execution target (fixed as 127.0.0.1, local execution)" + } + } +} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/deps.toml new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/tool.py new file mode 100644 index 0000000000000000000000000000000000000000..ae22fd96f719a0440dc8750909ae5d03bae5cb71 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/pkg_tool/tool.py @@ -0,0 +1,115 @@ +from typing import Dict, Optional +from pydantic import Field +from config.public.base_config_loader import LanguageEnum +from servers.oe_cli_mcp_server.mcp_tools.base_tools.pkg_tool.base import ( + init_result_dict, + PackageManager, + PkgActionEnum, + PkgCacheTypeEnum, + parse_pkg_action, + parse_cache_type, + logger, lang, safe_split_log +) + +def pkg_tool( + action: str = Field( + ..., + description="操作类型(枚举值:list/info/install/local-install/update/update-sec/remove/clean)" + ), + pkg_name: Optional[str] = Field(None, description="包名(info/install/update/remove必填)"), + filter_key: Optional[str] = Field(None, description="包名过滤关键词(list操作可选)"), + rpm_path: Optional[str] = Field(None, description="RPM文件路径(local-install必填)"), + cache_type: str = Field( + PkgCacheTypeEnum.ALL.value, + description="缓存类型(枚举值:all/packages/metadata,clean操作默认all)" + ), + yes: bool = Field(True, description="自动确认(install/update/remove默认True)"), +) -> Dict: + """ + OpenEuler软件包管理工具(枚举化参数,适配大模型识别) + """ + # 初始化结果字典 + result = init_result_dict() + result["pkg_name"] = pkg_name.strip() if pkg_name else "" + pm = PackageManager(lang=lang) + is_zh = lang == LanguageEnum.ZH + + # 1. 枚举值解析(兼容大模型字符串输入) + try: + action_enum = parse_pkg_action(action) + cache_type_enum = parse_cache_type(cache_type) + except ValueError as e: + result["message"] = str(e) if is_zh else str(e) + return result + + # 2. 核心参数校验 + try: + if action_enum == PkgActionEnum.LIST: + # 列出已安装包 + result["result"] = pm.list(filter_key=filter_key) + result["success"] = True + filter_desc = filter_key or "无" if is_zh else filter_key or "none" + result["message"] = f"已安装包列表查询完成(过滤关键词:{filter_desc})" if is_zh else f"Installed packages listed (filter: {filter_desc})" + + elif action_enum == PkgActionEnum.INFO: + # 查询包详情 + if not pkg_name: + raise ValueError("包名不能为空" if is_zh else "Package name cannot be empty") + result["result"] = [pm.info(pkg_name.strip())] # 统一列表格式 + result["success"] = True + result["message"] = f"包{pkg_name}详情查询完成" if is_zh else f"Package {pkg_name} info queried" + + elif action_enum == PkgActionEnum.INSTALL: + # 在线安装 + if not pkg_name: + raise ValueError("包名不能为空" if is_zh else "Package name cannot be empty") + log = pm.install(pkg_name.strip(), yes=yes) + result["success"] = True + result["message"] = f"包{pkg_name}在线安装成功" if is_zh else f"Package {pkg_name} installed successfully" + result["result"] = safe_split_log(log) + + elif action_enum == PkgActionEnum.LOCAL_INSTALL: + # 离线安装RPM + if not rpm_path: + raise ValueError("RPM路径不能为空" if is_zh else "RPM path cannot be empty") + log = pm.local_install(rpm_path.strip(), yes=yes) + result["success"] = True + result["message"] = f"RPM包{rpm_path}安装成功" if is_zh else f"RPM package {rpm_path} installed successfully" + result["result"] = safe_split_log(log) + + elif action_enum == PkgActionEnum.UPDATE: + # 更新包 + log = pm.update(pkg_name=pkg_name.strip() if pkg_name else None, yes=yes) + target = pkg_name or "所有包" if is_zh else pkg_name or "all packages" + result["success"] = True + result["message"] = f"{target}更新完成" if is_zh else f"{target} updated successfully" + result["result"] = safe_split_log(log) + + elif action_enum == PkgActionEnum.UPDATE_SEC: + # 仅更新安全补丁 + log = pm.update_sec(yes=yes) + result["success"] = True + result["message"] = "安全补丁更新完成" if is_zh else "Security patches updated successfully" + result["result"] = safe_split_log(log) + + elif action_enum == PkgActionEnum.REMOVE: + # 卸载包 + if not pkg_name: + raise ValueError("包名不能为空" if is_zh else "Package name cannot be empty") + log = pm.remove(pkg_name.strip(), yes=yes) + result["success"] = True + result["message"] = f"包{pkg_name}卸载完成" if is_zh else f"Package {pkg_name} removed successfully" + result["result"] = safe_split_log(log) + + elif action_enum == PkgActionEnum.CLEAN: + # 清理缓存 + log = pm.clean(cache_type=cache_type_enum) + result["success"] = True + result["message"] = f"{cache_type_enum.value}缓存清理完成" if is_zh else f"{cache_type_enum.value} cache cleaned successfully" + result["result"] = safe_split_log(log) + + except Exception as e: + result["message"] = f"操作失败:{str(e)}" if is_zh else f"Operation failed: {str(e)}" + logger.error(f"Package manager error: {str(e)}") + + return result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/base.py new file mode 100644 index 0000000000000000000000000000000000000000..7b3c1e23f9511c9c5a212082ea62f1cdaaedab20 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/base.py @@ -0,0 +1,240 @@ +import logging +import os +import subprocess +import re +from enum import Enum +from typing import Dict, List, Optional, Union +from config.public.base_config_loader import LanguageEnum, BaseConfig + +# 初始化日志(仅保留基础配置) +logger = logging.getLogger("proc_tool") +logger.setLevel(logging.INFO) + +# ========== 枚举类定义(不变,保留核心操作) ========== +class ProcActionEnum(str, Enum): + LIST = "list" # 所有进程 + FIND = "find" # 按名称/PID查询 + STAT = "stat" # 进程资源占用 + START = "start" # 启动systemd服务 + RESTART = "restart" # 重启systemd服务 + STOP = "stop" # 停止systemd服务 + KILL = "kill" # 强制终止进程 + +# ========== 通用工具函数(精简,保留必要功能) ========== +def get_language() -> LanguageEnum: + return BaseConfig().get_config().public_config.language + +def is_zh() -> bool: + return get_language() == LanguageEnum.ZH + +def init_result_dict(target_host: str = "127.0.0.1") -> Dict: + """初始化返回结果(简洁格式)""" + return { + "success": False, + "message": "", + "result": {}, + "target": target_host, + "proc_actions": [] + } + +def parse_proc_actions(action_list: List[str]) -> List[ProcActionEnum]: + """解析操作类型(仅保留必要校验)""" + valid_values = [e.value for e in ProcActionEnum] + if not action_list: + msg = f"进程操作列表不能为空,可选值:{','.join(valid_values)}" + raise ValueError(msg) + + parsed = [] + for action in action_list: + action = action.strip().lower() + if action not in valid_values: + msg = f"无效操作:{action},可选值:{','.join(valid_values)}" + raise ValueError(msg) + parsed.append(ProcActionEnum(action)) + return parsed + +# ========== 核心工具函数(精简,无复杂嵌套) ========== +def get_cmd_path(cmd: str) -> Optional[str]: + """获取命令绝对路径(简洁实现,避免复杂逻辑)""" + # 优先遍历常见路径(OpenEuler默认路径) + common_paths = ["/usr/bin", "/bin", "/usr/sbin", "/sbin"] + for path in common_paths: + cmd_path = os.path.join(path, cmd) + if os.path.exists(cmd_path) and os.access(cmd_path, os.X_OK): + return cmd_path + # 路径不存在返回None + logger.warning(f"命令不存在:{cmd}(未在常见路径中找到)") + return None + +def run_cmd(cmd: List[str]) -> Optional[str]: + """执行命令(精简异常处理,只捕获关键错误)""" + cmd_path = get_cmd_path(cmd[0]) + if not cmd_path: + return None # 命令不存在直接返回None + cmd[0] = cmd_path # 替换为绝对路径 + + try: + logger.info(f"执行命令:{' '.join(cmd)}") + result = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, check=True, timeout=10 + ) + return result.stdout.strip() + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + logger.error(f"命令执行失败:{e.stderr.strip() or str(e)}") + return None + except Exception as e: + logger.error(f"系统调用异常:{str(e)}") + return None + +# ========== 进程管理核心类(极简实现,易阅读) ========== +class ProcessManager: + def __init__(self): + self.zh = is_zh() # 提前判断语言,避免重复调用 + + def _msg(self, zh: str, en: str) -> str: + """简洁多语言提示(直接返回对应语言)""" + return zh if self.zh else en + + # ---------- 查:进程查询(核心功能,无多余嵌套) ---------- + def list_all_procs(self) -> List[Dict]: + """列出所有进程(精简解析逻辑)""" + output = run_cmd(["ps", "aux"]) + if not output: + return [{"error": self._msg("ps命令执行失败", "ps command failed")}] + + lines = [line.strip() for line in output.splitlines() if line.strip()] + if len(lines) < 2: + return [{"error": self._msg("无有效进程信息", "No valid process info")}] + + # 解析表头和数据 + headers = [h.lower() for h in lines[0].split()] + procs = [] + for line in lines[1:]: + parts = re.split(r"\s+", line, maxsplit=len(headers)-1) + proc = dict(zip(headers, parts)) + # 只转换关键数字字段(避免复杂处理) + for key in ["pid", "cpu", "mem"]: + if key in proc: + proc[key] = float(proc[key]) if "." in proc[key] else int(proc[key]) + procs.append(proc) + return procs + + def find_proc(self, name: Optional[str] = None, pid: Optional[int] = None) -> List[Dict]: + """按名称/PID查询(避免管道符,简洁实现)""" + if not (name or pid): + return [{"error": self._msg("必须指定进程名称或PID", "Must specify proc name or PID")}] + + # 按PID查询 + if pid: + output = run_cmd(["ps", "aux", "-p", str(pid)]) + return self._parse_ps_output(output) if output else [] + # 按名称查询(用ps+字符串过滤,避免grep管道) + else: + output = run_cmd(["ps", "aux"]) + if not output: + return [{"error": self._msg("查询进程失败", "Failed to find process")}] + procs = self._parse_ps_output(output) + # 过滤目标进程(排除grep自身) + return [p for p in procs if name.lower() in p.get("command", "").lower() and "grep" not in p.get("command", "")] + + def get_proc_stat(self, pid: int) -> Dict: + """获取进程资源占用(精简参数校验和解析)""" + if not isinstance(pid, int) or pid <= 0: + return {"error": self._msg("PID必须为正整数", "PID must be positive integer")} + + output = run_cmd(["ps", "-p", str(pid), "-o", "pid,%cpu,%mem,rss,vsz,etime"]) + if not output: + return {"error": self._msg(f"PID {pid} 不存在或查询失败", f"PID {pid} not found or query failed")} + + lines = [line.strip() for line in output.splitlines() if line.strip()] + if len(lines) < 2: + return {"error": self._msg(f"PID {pid} 不存在", f"PID {pid} does not exist")} + + # 解析状态信息 + headers = [h.lower().replace("%", "pct") for h in lines[0].split()] + parts = re.split(r"\s+", lines[1]) + stat = dict(zip(headers, parts)) + # 转换数字字段 + for key in ["pid", "cpupct", "mempct", "rss", "vsz"]: + if key in stat: + stat[key] = float(stat[key]) if "." in stat[key] else int(stat[key]) + return stat + + # ---------- 启/停/重启:systemd服务操作(简洁实现) ---------- + def _service_op(self, action: str, service_name: str) -> Dict: + """统一处理systemd服务操作(减少重复代码)""" + if not service_name: + return {"error": self._msg("必须指定服务名称", "Must specify service name")} + + output = run_cmd(["systemctl", action, service_name]) + if output is not None: + return {"message": self._msg(f"服务 {service_name} {action} 成功", f"Service {service_name} {action} success")} + else: + return {"error": self._msg(f"服务 {service_name} {action} 失败", f"Service {service_name} {action} failed")} + + def start_proc(self, service_name: str) -> Dict: + return self._service_op("start", service_name) + + def restart_proc(self, service_name: str) -> Dict: + return self._service_op("restart", service_name) + + def stop_proc(self, service_name: str) -> Dict: + return self._service_op("stop", service_name) + + # ---------- 强制终止进程(简洁实现) ---------- + def kill_proc(self, pid: int) -> Dict: + if not isinstance(pid, int) or pid <= 0: + return {"error": self._msg("PID必须为正整数", "PID must be positive integer")} + + # 先尝试正常终止,失败则强制终止(无嵌套try) + output = run_cmd(["kill", str(pid)]) + if output is not None: + return {"message": self._msg(f"已向PID {pid} 发送终止信号", f"Termination signal sent to PID {pid}")} + + output = run_cmd(["kill", "-9", str(pid)]) + if output is not None: + return {"message": self._msg(f"已强制终止PID {pid}", f"Forcibly terminated PID {pid}")} + else: + return {"error": self._msg(f"终止PID {pid} 失败", f"Failed to terminate PID {pid}")} + + # ---------- 辅助函数(精简) ---------- + def _parse_ps_output(self, output: str) -> List[Dict]: + """解析ps输出(无多余逻辑)""" + if not output: + return [] + lines = [line.strip() for line in output.splitlines() if line.strip()] + if len(lines) < 2: + return [] + headers = [h.lower() for h in lines[0].split()] + return [dict(zip(headers, re.split(r"\s+", line, maxsplit=len(headers)-1))) for line in lines[1:]] + + # ---------- 批量执行(精简校验和执行逻辑) ---------- + def exec_batch( + self, + actions: List[ProcActionEnum], + proc_name: Optional[str] = None, + pid: Optional[int] = None, + service_name: Optional[str] = None + ) -> Dict[str, Union[List[Dict], Dict]]: + batch_result = {} + # 操作映射(减少if-else) + action_map = { + ProcActionEnum.LIST: self.list_all_procs, + ProcActionEnum.FIND: lambda: self.find_proc(name=proc_name, pid=pid), + ProcActionEnum.STAT: lambda: self.get_proc_stat(pid=pid), + ProcActionEnum.START: lambda: self.start_proc(service_name=service_name), + ProcActionEnum.RESTART: lambda: self.restart_proc(service_name=service_name), + ProcActionEnum.STOP: lambda: self.stop_proc(service_name=service_name), + ProcActionEnum.KILL: lambda: self.kill_proc(pid=pid) + } + + # 执行每个操作(无复杂嵌套) + for action in actions: + try: + batch_result[action.value] = action_map[action]() + except Exception as e: + batch_result[action.value] = {"error": str(e)} + logger.error(f"操作 {action.value} 失败:{str(e)}") + + return batch_result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/config.json new file mode 100644 index 0000000000000000000000000000000000000000..978cd9ae9895510bfe7d7fd7b17c9c38d1439af2 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/config.json @@ -0,0 +1,8 @@ +{ + "tools": { + "proc_tool": { + "zh": "【进程管理工具】\n功能:支持进程的查看、启动、停止、强制终止等管理操作(基于ps/systemctl/kill命令)\n\n【核心提示】\n1. 操作类型(proc_actions)为枚举值列表,支持多选,可选枚举值及对应参数:\n - 查类:\n · list:所有进程(无需额外参数)\n · find:按名称/PID查询(需传proc_name或pid)\n · stat:进程资源占用(需传pid)\n - 启类:\n · start:启动服务(需传service_name,对应systemd服务名)\n · restart:重启服务(需传service_name)\n - 停类:\n · stop:停止服务(需传service_name)\n · kill:强制终止进程(需传pid)\n2. 输入格式示例:\n - 批量查询:[\"list\", \"find\"] + proc_name=\"nginx\"\n - 启停服务:[\"stop\", \"start\"] + service_name=\"nginx\"\n3. 语言配置:自动读取系统全局配置,不提供外部接口;\n4. 权限说明:启停/终止进程需root权限。\n\n【枚举类定义(必须遵守)】\n- ProcActionEnum(进程操作枚举):list / find / stat / start / restart / stop / kill\n\n【参数详情】\n- proc_actions:进程操作列表(必填,枚举值见上方)\n- proc_name:进程名称(find操作必填)\n- pid:进程PID(find/stat/kill操作必填,正整数)\n- service_name:服务名称(start/restart/stop操作必填,对应systemd服务名)\n\n【返回值说明】\n- success:操作结果(True=成功,False=失败)\n- message:操作信息/错误提示(自动切换中英文)\n- result:操作结果(查类返回结构化列表/字典,启/停类返回命令日志)\n- target:执行目标(固定127.0.0.1)\n- proc_actions:已执行的操作列表", + "en": "【Process Management Tool】\nFunction: Supports process management (view/start/stop/force terminate) via ps/systemctl/kill commands\n\n【Core Guidelines】\n1. Proc actions (proc_actions) is an enum list, supports multiple selection. Enum values and required parameters:\n - Query:\n · list: All processes (no extra params)\n · find: Query by name/PID (requires proc_name or pid)\n · stat: Process resource usage (requires pid)\n - Start:\n · start: Start service (requires service_name, systemd service name)\n · restart: Restart service (requires service_name)\n - Stop:\n · stop: Stop service (requires service_name)\n · kill: Force terminate process (requires pid)\n2. Input examples:\n - Batch query: [\"list\", \"find\"] + proc_name=\"nginx\"\n - Start/stop service: [\"stop\", \"start\"] + service_name=\"nginx\"\n3. Language config: Auto-read global system config, no external interface;\n4. Permission note: Start/stop/terminate requires root privileges.\n\n【Enum Class Definition (Must Follow)】\n- ProcActionEnum (Proc Action Enum): list / find / stat / start / restart / stop / kill\n\n【Parameter Details】\n- proc_actions: Proc action list (required, enum values see above)\n- proc_name: Proc name (required for find)\n- pid: Proc PID (required for find/stat/kill, positive integer)\n- service_name: Service name (required for start/restart/stop, systemd service name)\n\n【Return Value Explanation】\n- success: Operation result (True=success, False=failure)\n- message: Operation info/error prompt (auto-switch between Chinese/English)\n- result: Operation result (query returns structured list/dict, start/stop returns command log)\n- target: Execution target (fixed 127.0.0.1)\n- proc_actions: Executed action list" + } + } +} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/deps.toml new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/tool.py new file mode 100644 index 0000000000000000000000000000000000000000..82ccbc6847cddb79cd6af786e22a06fb2823a7f0 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/proc_tool/tool.py @@ -0,0 +1,56 @@ +from typing import Dict, List, Optional +from pydantic import Field +from servers.oe_cli_mcp_server.mcp_tools.base_tools.proc_tool.base import ( + init_result_dict, + ProcessManager, + parse_proc_actions, + is_zh, + logger +) + +def proc_tool( + proc_actions: List[str] = Field( + ..., + description="进程操作列表(枚举值:list/find/stat/start/restart/stop/kill,支持多选)" + ), + proc_name: Optional[str] = Field(None, description="进程名称(find操作必填)"), + pid: Optional[int] = Field(None, description="进程PID(find/stat/kill操作必填)"), + service_name: Optional[str] = Field(None, description="服务名称(start/restart/stop操作必填)") +) -> Dict: + """ + 进程管理工具(支持查/启/停批量操作) + 语言配置从全局BaseConfig读取,不提供外部接口 + """ + # 初始化结果字典 + result = init_result_dict() + proc_mgr = ProcessManager() + use_zh = is_zh() + + # 解析操作类型列表 + try: + parsed_actions = parse_proc_actions(proc_actions) + result["proc_actions"] = [a.value for a in parsed_actions] + except ValueError as e: + result["message"] = str(e) + return result + + # 执行批量操作 + try: + batch_result = proc_mgr.exec_batch( + actions=parsed_actions, + proc_name=proc_name, + pid=pid, + service_name=service_name + ) + result["result"] = batch_result + result["success"] = True + + # 生成提示信息 + action_str = ",".join(result["proc_actions"]) + result["message"] = f"以下进程操作执行完成:{action_str}" if use_zh else f"Proc actions executed: {action_str}" + + except Exception as e: + result["message"] = f"进程操作失败:{str(e)}" if use_zh else f"Proc action failed: {str(e)}" + logger.error(f"Proc manager error: {str(e)}") + + return result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py index 15fb85dd53743d2d0539ec546ca1f456bc41324b..c7c344504e4b7611e7a4b49136309cd347b01a75 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py @@ -5,7 +5,6 @@ import subprocess import re from enum import Enum from typing import Dict, List, Optional - from config.public.base_config_loader import LanguageEnum, BaseConfig # 初始化日志