diff --git a/scripts/deploy/4-other-script/agent_manager.py b/scripts/deploy/4-other-script/agent_manager.py index 15d001f07678f0bd65016532c1426228668434ab..609574382b4e26b12873f98c1db19750cc97ce8a 100644 --- a/scripts/deploy/4-other-script/agent_manager.py +++ b/scripts/deploy/4-other-script/agent_manager.py @@ -1,3 +1,11 @@ +""" +MCP服务器管理工具 + +用于管理MCP服务器的创建、配置和部署的工具。 +""" + +from __future__ import annotations + import argparse import asyncio import json @@ -6,10 +14,11 @@ import os import shutil import sys from enum import Enum -from typing import Any, Optional +from pathlib import Path +from typing import Any import aiohttp -from aiohttp import ClientError, ClientResponseError +from aiohttp import ClientError, ClientResponseError, ClientTimeout from pydantic import BaseModel, Field, ValidationError # 配置日志系统 @@ -27,6 +36,7 @@ REQUEST_TIMEOUT = 30 # 请求超时时间(秒) MAX_RETRY_COUNT = 3 # API请求重试次数 RETRY_DELAY = 2 # 重试延迟(秒) SERVICE_WAIT_TIMEOUT = 60 # 服务等待超时时间(秒) +HTTP_OK = 200 # HTTP 成功状态码 class AppType(str, Enum): @@ -97,6 +107,14 @@ class ApiClient: """API请求客户端封装""" def __init__(self, base_url: str, *, verify_ssl: bool = False) -> None: + """ + 初始化API客户端。 + + Args: + base_url: API的基础URL + verify_ssl: 是否验证SSL证书 + + """ self.base_url = base_url connector = aiohttp.TCPConnector(ssl=verify_ssl) self.session = aiohttp.ClientSession(connector=connector) @@ -105,7 +123,7 @@ class ApiClient: """关闭客户端会话""" await self.session.close() - async def request(self, method: str, path: str, **kwargs) -> dict[str, Any]: + async def request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]: """ 带重试机制的异步API请求 @@ -124,16 +142,22 @@ class ApiClient: url = f"{self.base_url}{path}" for retry in range(MAX_RETRY_COUNT): try: - async with self.session.request(method, url, timeout=REQUEST_TIMEOUT, **kwargs) as response: + async with self.session.request( + method, + url, + timeout=ClientTimeout(total=REQUEST_TIMEOUT), + **kwargs, + ) as response: response.raise_for_status() return await response.json() except (ClientResponseError, ClientError) as e: - logger.warning(f"API请求失败(第{retry + 1}/{MAX_RETRY_COUNT}次) - {method} {url}: {str(e)}") + logger.warning("API请求失败(第%d/%d次) - %s %s: %s", retry + 1, MAX_RETRY_COUNT, method, url, e) if retry < MAX_RETRY_COUNT - 1: await asyncio.sleep(RETRY_DELAY) - raise RuntimeError(f"API请求多次失败: {method} {url}") + msg = f"API请求多次失败: {method} {url}" + raise RuntimeError(msg) def copy_folder(src_dir: str, dest_dir: str) -> None: @@ -149,28 +173,32 @@ def copy_folder(src_dir: str, dest_dir: str) -> None: RuntimeError: 复制过程中发生错误 """ - if not os.path.isdir(src_dir): - raise NotADirectoryError(f"源路径 {src_dir} 不是一个有效的文件夹") + if not Path(src_dir).is_dir(): + msg = f"源路径 {src_dir} 不是一个有效的文件夹" + raise NotADirectoryError(msg) - src_folder_name = os.path.basename(src_dir) - dest_full_path = os.path.join(dest_dir, src_folder_name) + src_path = Path(src_dir) + src_folder_name = src_path.name + dest_full_path = Path(dest_dir) / src_folder_name try: - os.makedirs(dest_full_path, exist_ok=True) - for item in os.listdir(src_dir): - src_item = os.path.join(src_dir, item) - dest_item = os.path.join(dest_full_path, item) + dest_full_path.mkdir(parents=True, exist_ok=True) + for item in src_path.iterdir(): + src_item = item + dest_item = dest_full_path / item.name - if os.path.isdir(src_item): - copy_folder(src_item, dest_full_path) + if src_item.is_dir(): + copy_folder(str(src_item), str(dest_full_path)) else: - shutil.copy2(src_item, dest_item) - logger.debug(f"已复制文件: {src_item} -> {dest_item}") + shutil.copy2(str(src_item), str(dest_item)) + logger.debug("已复制文件: %s -> %s", src_item, dest_item) - except PermissionError: - raise RuntimeError(f"权限不足,无法操作文件: {src_item}") - except Exception as e: - raise RuntimeError(f"复制文件夹失败: {str(e)}") + except PermissionError as e: + msg = f"权限不足,无法操作文件: {src_dir}" + raise RuntimeError(msg) from e + except OSError as e: + msg = f"复制文件夹失败: {e!s}" + raise RuntimeError(msg) from e def get_config(config_path: str) -> dict: @@ -189,23 +217,27 @@ def get_config(config_path: str) -> dict: RuntimeError: 读取文件失败 """ - if not os.path.exists(config_path): - raise FileNotFoundError(f"配置文件不存在: {config_path}") + if not Path(config_path).exists(): + msg = f"配置文件不存在: {config_path}" + raise FileNotFoundError(msg) try: - with open(config_path, "r", encoding="utf-8") as reader: + with Path(config_path).open(encoding="utf-8") as reader: config = json.load(reader) if not isinstance(config, dict): - raise ValueError("配置文件内容必须为JSON对象") - - logger.info(f"成功加载配置文件: {config_path}") - return config + msg = "配置文件内容必须为JSON对象" + raise TypeError(msg) from None + logger.info("成功加载配置文件: %s", config_path) except json.JSONDecodeError as e: - raise ValueError(f"配置文件格式错误: {str(e)}") - except Exception as e: - raise RuntimeError(f"读取配置文件失败: {str(e)}") + msg = f"配置文件格式错误: {e!s}" + raise ValueError(msg) from e + except OSError as e: + msg = f"读取配置文件失败: {e!s}" + raise RuntimeError(msg) from e + else: + return config async def wait_for_mcp_service(api_client: ApiClient, service_id: str) -> dict[str, Any]: @@ -223,38 +255,63 @@ async def wait_for_mcp_service(api_client: ApiClient, service_id: str) -> dict[s RuntimeError: 服务超时未就绪 """ - logger.info(f"等待MCP服务就绪: {service_id}") + logger.info("等待MCP服务就绪: %s", service_id) for elapsed in range(SERVICE_WAIT_TIMEOUT): try: service = await query_mcp_server(api_client, service_id) if service and service.get("status") == "ready": - logger.info(f"MCP服务 {service_id} 已就绪 (耗时 {elapsed} 秒)") + logger.info("MCP服务 %s 已就绪 (耗时 %d 秒)", service_id, elapsed) return service await asyncio.sleep(1) - except Exception as e: - logger.warning(f"查询服务状态失败: {str(e)},将继续等待") + except (ClientError, ClientResponseError, RuntimeError) as e: + logger.warning("查询服务状态失败: %s,将继续等待", e) - raise RuntimeError(f"MCP服务 {service_id} 等待超时 ({SERVICE_WAIT_TIMEOUT}秒) 未就绪") + msg = f"MCP服务 {service_id} 等待超时 ({SERVICE_WAIT_TIMEOUT}秒) 未就绪" + raise RuntimeError(msg) async def delete_mcp_server(api_client: ApiClient, server_id: str) -> dict[str, Any]: - """删除mcp服务""" - logger.info(f"删除MCP服务: {server_id}") + """ + 删除MCP服务。 + + Args: + api_client: API客户端实例 + server_id: 要删除的服务ID + + Returns: + 删除操作的响应 + + """ + logger.info("删除MCP服务: %s", server_id) return await api_client.request("DELETE", f"/api/mcp/{server_id}") async def create_mcp_server(api_client: ApiClient, mcp_config: dict) -> str: - """创建或更新mcp服务状态""" + """ + 创建或更新MCP服务状态。 + + Args: + api_client: API客户端实例 + mcp_config: MCP服务配置 + + Returns: + 创建的服务ID + + Raises: + RuntimeError: 创建失败时 + + """ logger.info("创建MCP服务") response = await api_client.request("POST", "/api/mcp/", json=mcp_config) service_id = response.get("result", {}).get("serviceId") if not service_id: - raise RuntimeError("创建MCP服务未返回有效的serviceId") + msg = "创建MCP服务未返回有效的serviceId" + raise RuntimeError(msg) - logger.info(f"MCP服务创建成功,service_id: {service_id}") + logger.info("MCP服务创建成功,service_id: %s", service_id) return service_id @@ -278,7 +335,7 @@ async def process_mcp_config(api_client: ApiClient, config_path: str) -> str: await delete_mcp_server(api_client, config["serviceId"]) del config["serviceId"] logger.info("已删除旧的MCP服务ID") - except Exception: + except (ClientError, ClientResponseError, RuntimeError): logger.exception("删除旧MCP服务失败(可能不存在),继续创建新服务") # 创建新服务 @@ -287,28 +344,33 @@ async def process_mcp_config(api_client: ApiClient, config_path: str) -> str: # 保存更新后的配置文件 try: config["serviceId"] = server_id - with open(config_path, "w", encoding="utf-8") as writer: - json.dump(config, writer, ensure_ascii=False, indent=4) - logger.info(f"配置文件已更新: {config_path}") - return server_id + def write_config() -> None: + with Path(config_path).open("w", encoding="utf-8") as writer: + json.dump(config, writer, ensure_ascii=False, indent=4) - except Exception as e: - raise RuntimeError(f"保存配置文件失败: {str(e)}") + await asyncio.to_thread(write_config) + logger.info("配置文件已更新: %s", config_path) + except OSError as e: + msg = f"保存配置文件失败: {e!s}" + raise RuntimeError(msg) from e + else: + return server_id -async def query_mcp_server(api_client: ApiClient, mcp_id: str) -> Optional[dict[str, Any]]: +async def query_mcp_server(api_client: ApiClient, mcp_id: str) -> dict[str, Any] | None: """查询MCP服务状态""" - logger.debug(f"查询MCP服务状态: {mcp_id}") + logger.debug("查询MCP服务状态: %s", mcp_id) response = await api_client.request("GET", "/api/mcp") - if response.get("code") != 200: - raise RuntimeError(f"查询MCP服务失败: {response.get('message', '未知错误')}") + if response.get("code") != HTTP_OK: + msg = f"查询MCP服务失败: {response.get('message', '未知错误')}" + raise RuntimeError(msg) services = response.get("result", {}).get("services", []) for service in services: if service.get("mcpserviceId") == mcp_id: - logger.debug(f"MCP服务 {mcp_id} 状态: {service.get('status')}") + logger.debug("MCP服务 %s 状态: %s", mcp_id, service.get("status")) return service return None @@ -316,18 +378,19 @@ async def query_mcp_server(api_client: ApiClient, mcp_id: str) -> Optional[dict[ async def install_mcp_server(api_client: ApiClient, mcp_id: str) -> dict[str, Any] | None: """安装mcp服务""" - logger.info(f"安装MCP服务: {mcp_id}") + logger.info("安装MCP服务: %s", mcp_id) response = await api_client.request("GET", "/api/mcp") - if response.get("code") != 200: - raise RuntimeError(f"查询MCP服务列表失败: {response.get('message', '未知错误')}") + if response.get("code") != HTTP_OK: + msg = f"查询MCP服务列表失败: {response.get('message', '未知错误')}" + raise RuntimeError(msg) services = response.get("result", {}).get("services", []) for service in services: if service.get("mcpserviceId") == mcp_id: - logger.debug(f"MCP服务 {mcp_id} 状态: {service.get('status')}") + logger.debug("MCP服务 %s 状态: %s", mcp_id, service.get("status")) if service.get("status") != "ready": - logger.debug(f"开始安装MCP服务{mcp_id}") + logger.debug("开始安装MCP服务%s", mcp_id) return await api_client.request("POST", f"/api/mcp/{mcp_id}/install") break return None @@ -335,13 +398,13 @@ async def install_mcp_server(api_client: ApiClient, mcp_id: str) -> dict[str, An async def activate_mcp_server(api_client: ApiClient, mcp_id: str) -> dict[str, Any]: """激活mcp服务""" - logger.info(f"激活MCP服务: {mcp_id}") + logger.info("激活MCP服务: %s", mcp_id) return await api_client.request("POST", f"/api/mcp/{mcp_id}", json={"active": "true"}) async def deploy_app(api_client: ApiClient, app_id: str) -> dict[str, Any]: """发布应用""" - logger.info(f"发布应用: {app_id}") + logger.info("发布应用: %s", app_id) return await api_client.request("POST", f"/api/app/{app_id}", json={}) @@ -349,19 +412,21 @@ async def call_app_api(api_client: ApiClient, appdata: AppData) -> str: """创建智能体应用agent""" try: app_data_dict = appdata.model_dump(by_alias=True) - logger.debug(f"创建应用数据: {json.dumps(app_data_dict, ensure_ascii=False)}") + logger.debug("创建应用数据: %s", json.dumps(app_data_dict, ensure_ascii=False)) response = await api_client.request("POST", "/api/app", json=app_data_dict) app_id = response.get("result", {}).get("appId") if not app_id: - raise RuntimeError("创建应用未返回有效的appId") - - logger.info(f"应用创建成功,app_id: {app_id}") - return app_id + msg = "创建应用未返回有效的appId" + raise RuntimeError(msg) + logger.info("应用创建成功,app_id: %s", app_id) except ValidationError as e: - raise ValueError(f"应用数据验证失败: {str(e)}") + msg = f"应用数据验证失败: {e!s}" + raise ValueError(msg) from e + else: + return app_id async def get_app_list(api_client: ApiClient) -> str: @@ -371,7 +436,8 @@ async def get_app_list(api_client: ApiClient) -> str: return str(response) except ValidationError as e: - raise ValueError(f"应用数据验证失败: {str(e)}") + msg = f"应用数据验证失败: {e!s}" + raise ValueError(msg) from e async def comb_create(api_client: ApiClient, config_path: str) -> None: @@ -380,7 +446,8 @@ async def comb_create(api_client: ApiClient, config_path: str) -> None: mcp_services = config.get("mcpService", []) if not mcp_services: - raise ValueError("配置文件中未找到mcpService列表") + msg = "配置文件中未找到mcpService列表" + raise ValueError(msg) # 处理所有MCP服务 for service in mcp_services: @@ -398,8 +465,9 @@ async def comb_create(api_client: ApiClient, config_path: str) -> None: app_id = await call_app_api(api_client, app_data) await deploy_app(api_client, app_id) logger.info("组合创建流程完成") - except Exception as e: - raise RuntimeError(f"组合创建失败: {str(e)}") + except (ClientError, ClientResponseError, RuntimeError, ValueError) as e: + msg = f"组合创建失败: {e!s}" + raise RuntimeError(msg) from e async def create_agent(api_client: ApiClient, config_path: str) -> None: @@ -408,7 +476,8 @@ async def create_agent(api_client: ApiClient, config_path: str) -> None: service_id = config.get("serviceId") if not service_id: - raise ValueError("配置文件中未找到serviceId") + msg = "配置文件中未找到serviceId" + raise ValueError(msg) # 安装并等待服务就绪 await install_mcp_server(api_client, service_id) @@ -439,6 +508,7 @@ async def create_agent(api_client: ApiClient, config_path: str) -> None: async def main_async() -> None: + """主异步函数,处理命令行参数并执行相应的操作""" parser = argparse.ArgumentParser(description="MCP服务器管理工具") parser.add_argument( "operator", @@ -456,8 +526,8 @@ async def main_async() -> None: logger.setLevel(logging.DEBUG) # 验证配置文件路径 - if not args.config_path or not os.path.isfile(args.config_path): - logger.error(f"无效的配置文件路径: {args.config_path}") + if not args.config_path or not Path(args.config_path).is_file(): + logger.error("无效的配置文件路径: %s", args.config_path) sys.exit(1) # 创建API客户端 @@ -471,8 +541,8 @@ async def main_async() -> None: elif args.operator == "comb": await comb_create(api_client, args.config_path) logger.info("操作执行成功") - except Exception as e: - logger.error(f"操作失败: {str(e)}", exc_info=args.verbose) + except (ClientError, ClientResponseError, RuntimeError, ValueError, FileNotFoundError): + logger.exception("操作失败") sys.exit(1) finally: await api_client.close() @@ -480,6 +550,7 @@ async def main_async() -> None: def main() -> None: + """程序入口点""" try: asyncio.run(main_async()) except KeyboardInterrupt: diff --git a/scripts/deploy/4-other-script/init_agent.py b/scripts/deploy/4-other-script/init_agent.py index 4ba80a3880278c0dc1a68b6574d148e47764e4c1..d55db3e7b2defc9190ffa3eff7c8de6f41b98377 100644 --- a/scripts/deploy/4-other-script/init_agent.py +++ b/scripts/deploy/4-other-script/init_agent.py @@ -1,9 +1,28 @@ +""" +初始化智能体应用的脚本 + +此脚本用于查询MCP服务列表,激活未激活的服务, +并为每个服务创建对应的智能体应用。 +""" + +from __future__ import annotations + +import logging +from enum import Enum + import requests from pydantic import BaseModel, Field -from enum import Enum + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) base_url = "http://127.0.0.1:8002" +# 常量定义 +SUCCESS_CODE = 200 +TIMEOUT = 10 + class AppType(str, Enum): """应用中心应用类型""" @@ -59,16 +78,17 @@ class AppData(BaseModel): name: str = Field(..., max_length=20, description="应用名称") description: str = Field(..., max_length=150, description="应用简介") links: list[AppLink] = Field(default=[], description="相关链接", max_length=5) - first_questions: list[str] = Field( - default=[], alias="recommendedQuestions", description="推荐问题", max_length=3) + first_questions: list[str] = Field(default=[], alias="recommendedQuestions", description="推荐问题", max_length=3) history_len: int = Field(3, alias="dialogRounds", ge=1, le=10, description="对话轮次(1~10)") permission: AppPermissionData = Field( - default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") + default_factory=lambda: AppPermissionData(authorizedUsers=None), + description="权限配置", + ) workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") -def call_mcp_api(): +def call_mcp_api() -> dict | None: """ 调用MCP API获取服务列表信息 @@ -80,30 +100,30 @@ def call_mcp_api(): try: # 发送GET请求 - response = requests.get(url, timeout=10) + response = requests.get(url, timeout=TIMEOUT) # 检查响应状态码 - if response.status_code == 200: + if response.status_code == SUCCESS_CODE: # 解析JSON响应 result = response.json() # 检查返回的code是否为200 - if result.get("code") == 200: - print("API调用成功") + if result.get("code") == SUCCESS_CODE: + logger.info("API调用成功") return result - else: - print(f"API返回错误: {result.get('message', '未知错误')}") - return None - else: - print(f"请求失败,状态码: {response.status_code}") + logger.error("API返回错误: %s", result.get("message", "未知错误")) return None - except requests.exceptions.RequestException as e: - print(f"请求发生异常: {str(e)}") + logger.error("请求失败,状态码: %s", response.status_code) + + except requests.exceptions.RequestException: + logger.exception("请求发生异常") + return None + else: return None -def call_app_api(appdata: AppData): +def call_app_api(appdata: AppData) -> str | None: """ 创建智能体应用agent @@ -114,141 +134,135 @@ def call_app_api(appdata: AppData): url = base_url + "/api/app" try: # 发送POST请求 - response = requests.post(url, json=appdata.model_dump(by_alias=True)) + response = requests.post(url, json=appdata.model_dump(by_alias=True), timeout=TIMEOUT) # 检查响应状态码 response.raise_for_status() # 如果状态码不是200,会抛出HTTPError异常 - print(f"创建智能体应用agent成功,name: {appdata.name}") - return response.json().get("result", {})["appId"] + logger.info("创建智能体应用agent成功,name: %s", appdata.name) + return response.json().get("result", {}).get("appId") - except requests.exceptions.HTTPError as e: - print(f"HTTP错误: {e}") + except requests.exceptions.HTTPError: + logger.exception("HTTP错误") except requests.exceptions.ConnectionError: - print("连接错误,无法连接到服务器") + logger.exception("连接错误,无法连接到服务器") except requests.exceptions.Timeout: - print("请求超时") - except requests.exceptions.RequestException as e: - print(f"请求发生错误: {e}") + logger.exception("请求超时") + except requests.exceptions.RequestException: + logger.exception("请求发生错误") return None -def query_mcp_server(mcp_id: str): - """ - 查询mcp服务状态 - """ +def query_mcp_server(mcp_id: str) -> dict | None: + """查询mcp服务状态""" url = base_url + "/api/mcp/" + mcp_id try: # 发送POST请求 - response = requests.get(url) + response = requests.get(url, timeout=TIMEOUT) # 检查响应状态码 response.raise_for_status() # 如果状态码不是200,会抛出HTTPError异常 - print(f"请求成功,状态码: {response.status_code}") + logger.info("请求成功,状态码: %s", response.status_code) return response.json() - except requests.exceptions.HTTPError as e: - print(f"HTTP错误: {e}") + except requests.exceptions.HTTPError: + logger.exception("HTTP错误") except requests.exceptions.ConnectionError: - print("连接错误,无法连接到服务器") + logger.exception("连接错误,无法连接到服务器") except requests.exceptions.Timeout: - print("请求超时") - except requests.exceptions.RequestException as e: - print(f"请求发生错误: {e}") + logger.exception("请求超时") + except requests.exceptions.RequestException: + logger.exception("请求发生错误") return None -def activate_mcp_server(mcp_id: str): - """ - 激活mcp服务 - """ +def activate_mcp_server(mcp_id: str) -> dict | None: + """激活mcp服务""" url = base_url + "/api/mcp/" + mcp_id body = {"active": "true"} try: # 发送POST请求 - response = requests.post(url, json=body) + response = requests.post(url, json=body, timeout=TIMEOUT) # 检查响应状态码 response.raise_for_status() # 如果状态码不是200,会抛出HTTPError异常 - print(f"激活mcp服务 mcp_id: {mcp_id} 成功") + logger.info("激活mcp服务 mcp_id: %s 成功", mcp_id) return response.json() - except requests.exceptions.HTTPError as e: - print(f"HTTP错误: {e}") + except requests.exceptions.HTTPError: + logger.exception("HTTP错误") except requests.exceptions.ConnectionError: - print("连接错误,无法连接到服务器") + logger.exception("连接错误,无法连接到服务器") except requests.exceptions.Timeout: - print("请求超时") - except requests.exceptions.RequestException as e: - print(f"请求发生错误: {e}") + logger.exception("请求超时") + except requests.exceptions.RequestException: + logger.exception("请求发生错误") return None -def deploy_app(app_id:str): - """ - 发布应用 - """ +def deploy_app(app_id: str) -> dict | None: + """发布应用""" url = base_url + "/api/app/" + app_id try: # 发送POST请求 - response = requests.post(url, json={}) + response = requests.post(url, json={}, timeout=TIMEOUT) # 检查响应状态码 response.raise_for_status() # 如果状态码不是200,会抛出HTTPError异常 - print(f"发布应用 app_id: {app_id} 成功") + logger.info("发布应用 app_id: %s 成功", app_id) return response.json() - except requests.exceptions.HTTPError as e: - print(f"HTTP错误: {e}") + except requests.exceptions.HTTPError: + logger.exception("HTTP错误") except requests.exceptions.ConnectionError: - print("连接错误,无法连接到服务器") + logger.exception("连接错误,无法连接到服务器") except requests.exceptions.Timeout: - print("请求超时") - except requests.exceptions.RequestException as e: - print(f"请求发生错误: {e}") + logger.exception("请求超时") + except requests.exceptions.RequestException: + logger.exception("请求发生错误") return None -def create_agent(): + +def create_agent() -> None: """ 主入口 + 查询mcp服务,依次激活,根据mcp服务创建agent """ api_result = call_mcp_api() if api_result: # 打印服务列表信息 services = api_result.get("result", {}).get("services", []) - print(f"\n共获取到 {len(services)} 个服务:") + logger.info("共获取到 %s 个服务", len(services)) for service in services: - print(f"\n服务ID: {service.get('mcpserviceId')}") - print(f"服务名称: {service.get('name')}") - print(f"服务状态: {service.get('status')}") - print(f"服务描述: {service.get('description')}") + logger.info("服务ID: %s", service.get("mcpserviceId")) + logger.info("服务名称: %s", service.get("name")) + logger.info("服务状态: %s", service.get("status")) + logger.info("服务描述: %s", service.get("description")) app_data = AppData( appType=AppType.AGENT, - description=service.get('description')[:150], + description=service.get("description")[:150], dialogRounds=3, icon="", - mcpService=[service.get('mcpserviceId')], - name=service.get('name'), - permission=AppPermissionData( - visibility="public", - authorizedUsers=[] - ) + mcpService=[service.get("mcpserviceId")], + name=service.get("name"), + permission=AppPermissionData(visibility=PermissionType.PUBLIC, authorizedUsers=[]), ) if not service.get("isActive"): - activate_mcp_server(service.get('mcpserviceId')) + activate_mcp_server(service.get("mcpserviceId")) app_id = call_app_api(app_data) - deploy_app(app_id) + if app_id: + deploy_app(app_id) # 使用示例 if __name__ == "__main__": - create_agent() \ No newline at end of file + create_agent()